目录
一、MQTT简介
二、MQTT使用方法
三、MQTT驱动设计
四、代码解析
五、使用过程
六、总结
一、MQTT简介
MQTT因为其轻量、高效和稳定的特点,特别适合作为物联网系统的数据传输协议,已经成为物联网事实上的通信标准了。关于协议的具体内容看看这篇文章和官方文档MQTT协议详解(完整版)-CSDN博客,在这里我们主要讲解使用方法。
作为嵌入式设备,设备资源比较紧张,我们这里选用开源库paho mqtt,开源地址在这儿GitHub - eclipse/paho.mqtt.embedded-c: Paho MQTT C client library for embedded systems. Paho is an Eclipse IoT project (https://iot.eclipse.org/)
我们项目里已经都整理好了,直接用就行了,具体如下图所示,从映射文件可以看出,mqtt开源库大概占用2KB的 ROM,已经很轻量化了。这个开源库的核心作用就是可以帮我们根据协议要求组合要发送的数据,或者拆解接收到的数据,而应用层不用去太关心协议本身的内容。
二、MQTT使用方法
MQTT是以服务器为中心,客户端对为对象,话题为关系纽带的一种通讯协议,在这个体系里,净化器设备是客户端,用户手机也是客户端,手机订阅净化器发布的话题,服务器就会把净化器发布的消息推送给手机;同样的道理,手机根据设备订阅的话题来发布消息,就可以对净化器设备进行控制了。
下图是净化器项目的话题,其中11223344是设备的序列号,对于所有净化器的数据手机都能收的到,手机针对某个净化器的数据也只有某个净化器能接收,其它序列号的设备收不到。这里面的核心逻辑都是服务器根据话题来区分运行的。
三、MQTT驱动设计
MQTT的驱动应该算是比较难的,首先要确定它的地位和作用,如下图所示,drv_mqtt是作为设备端mqtt的核心,整合了底层的开源库、物理层的收发接口和应用层的参数配置功能,以及自身的连接、收发、订阅/取消订阅等功能。
下面进入代码进行解析,从头文件开始,MQTTPacket.h主要包含了mqtt开源库的功能文件,这个应该没什么问题,下面的ringbuffer.h需要强调下,它是RT-Thread的功能,叫环形缓冲区,就是数据按顺序环形保存,取出的时候按照先进先出的原则,MQTT开源库需要按顺序取出数据解析,有这个ringbuffer作为缓存媒介在操作上非常便捷,这也是使用RT-Thread的另一个重要原因了。
接下来是宏定义的内容,没什么特殊情况默认即可,有需要改变的在user_opt.h中重定义即可,具体的内容都有注释,就不赘述了。
订阅话题是个重要组成部分,在这里定义了话题的三个状态,空闲、订阅和取消订阅,取消订阅一般用不到,特殊情况下会有一些临时话题,为了缓解资源,可以取消订阅。结构体里的base_msg_id主要是为了标记 订阅/取消订阅 时返回的话题,这样程序才能区分。
最后是最重要的客户端连接信息了,具体都有注释,其中用户名、密码和客户端ID都是指针,在应用层定义这些信息需要用全局变量或者静态变量,才能保证信息的完整性;同样的,收发函数也是采用回调的方式,在应用程根据不同的物理接口进行注册,这里我们采用的自然是esp8266的收发函数了。
四、代码解析
先从初始化开始,主要就是对用户名、密码和客户端ID进行赋值。
/*
================================================================================
描述 : 初始化指定MQTT连接
输入 :
输出 :
================================================================================
*/
void drv_mqtt_init(u8 index, char *usr_name, char *passwd, char *client_id)
{
if(index<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[index];
MQTTPacket_connectData connect_init = MQTTPacket_connectData_initializer;
if((pClient->rb=rt_ringbuffer_create(MQTT_RING_BUFF_SIZE))!=NULL )
{
memcpy(&pClient->condata, &connect_init, sizeof(connect_init));//复制连接初始化信息
pClient->condata.keepAliveInterval=MQTT_KEEP_TIME;
pClient->condata.username.cstring=usr_name;//用户名
pClient->condata.password.cstring=passwd;//密码
pClient->condata.clientID.cstring=client_id;//客户ID
pClient->is_enable=true;
}
}
}
接下来就是连接和订阅了,在这里就可以很清晰的看到mqtt开源库的作用了,就是组合连接、订阅和取消订阅的报文。MQTT里也有保活功能,这是协议层的,如果指定时间内没有没有收到数据,那么会自己发个ping请求包来保持连接。
/*
================================================================================
描述 : 连接和订阅
输入 :
输出 :
================================================================================
*/
void drv_mqtt_connect(void)
{
static u32 last_sec_time=0;
static u8 make_buff[80]={0};
const int make_size=sizeof(make_buff);
int make_len;
u32 now_sec_time=drv_get_sec_counter();
if(now_sec_time-last_sec_time>=2)
{
static u8 conn_ptr=0;
if(conn_ptr>=MQTT_CONN_NUM)
conn_ptr=0;
MqttClientStruct *pClient=&g_sMqttWork.client_list[conn_ptr];
if(pClient->is_enable)
{
if(pClient->is_connected==false)
{
memset(make_buff, 0, make_size);
make_len=MQTTSerialize_connect(make_buff, make_size, &pClient->condata);//组合连接请求包
if(pClient->mqtt_send!=NULL)
{
// printf("client=%d, mqtt send connect! make_len=%d\n",conn_ptr, make_len);
pClient->mqtt_send(make_buff, make_len);//发送
}
}
else
{
//订阅话题
for(u8 i=0; i<MQTT_SUB_NUM; i++)
{
SubPackStruct *pSub=&pClient->sub_list[i];
if(strlen(pSub->sub_topic)>0 && pSub->curr_state!=pSub->dst_state)
{
if(pSub->dst_state==TopicStateSub)//需要订阅
{
MQTTString topicString = MQTTString_initializer;
int req_qos=1;
topicString.cstring=pSub->sub_topic;
memset(make_buff, 0, make_size);
make_len = MQTTSerialize_subscribe(make_buff, make_size, 0, pSub->base_msg_id, 1, &topicString, &req_qos);//组合订阅报文
if(pClient->mqtt_send!=NULL)
{
printf("sub topic=%s\n", pSub->sub_topic);
pClient->mqtt_send(make_buff, make_len);//发送
}
}
else if(pSub->dst_state==TopicStateUnSub)//需要取消订阅
{
MQTTString topicString = MQTTString_initializer;
topicString.cstring=pSub->sub_topic;
memset(make_buff, 0, make_size);
make_len = MQTTSerialize_unsubscribe(make_buff, make_size, 0, pSub->base_msg_id, 1, &topicString);//组合取消订阅报文
if(pClient->mqtt_send!=NULL)
{
printf("unsub topic=%s\n", pSub->sub_topic);
pClient->mqtt_send(make_buff, make_len);//发送
}
}
break;//每次只订阅一个,避免堵塞
}
}
//超时检测
u32 det_time=now_sec_time-pClient->keep_time;
if(det_time>=MQTT_KEEP_TIME)
{
printf("mqtt sock_id=%d timeout, close!\n", conn_ptr);
drv_mqtt_close(pClient);//超时关闭
}
else if(det_time>=MQTT_KEEP_TIME-10)
{
//发送ping请求,保活
memset(make_buff, 0, make_size);
make_len=MQTTSerialize_pingreq(make_buff, make_size);//组合ping包
if(pClient->mqtt_send!=NULL)
{
// printf("sock=%d, mqtt send ping req! make_len=%d\n",conn_ptr,make_len);
pClient->mqtt_send(make_buff, make_len);//发送
}
}
}
}
conn_ptr++;
last_sec_time=drv_get_sec_counter();
}
}
接收部分的逻辑是MQTTPacket_read函数调用回调函数pClient->mqtt_recv获取环形缓冲区内的数据并按照协议解析,最后根据解析结果执行相应动作,消息类型如下图所示,常用的是连接回复、收到发布数据、订阅回复、取消订阅回复、ping回复和断开连接。
/*
================================================================================
描述 : 接收检查
输入 :
输出 :
================================================================================
*/
void drv_mqtt_recv_check(void)
{
static u8 make_buff[MQTT_SUB_BUFF_SIZE];
const int make_size=sizeof(make_buff);
int rc;
u8 dup;
int qos;
u8 retained;
u16 msgid;
int payloadlen_in;
u8 *payload_in;
MQTTString receivedTopic;
for(u8 i=0; i<MQTT_CONN_NUM; i++)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[i];
if(pClient->is_enable==true)//启用
{
rc=MQTTPacket_read(make_buff, make_size, pClient->mqtt_recv);
switch(rc)
{
case CONNACK://连接回复
{
printf("mqtt_id=%d CONNACK!\n", i);
u8 sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, make_buff, make_size) != 1 || connack_rc != 0)//解析收到的回复报文
{
drv_mqtt_close(pClient);
printf("mqtt sock_id=%d Unable to connect, return code %d\n",i, connack_rc);
}
else
{
pClient->is_connected=true;
pClient->keep_time=drv_get_sec_counter();//更新时间
printf("mqtt sock_id=%d connect ok!\n", i);
}
break;
}
case PUBREC:
case PUBACK: //发布回复
{
// debug("sock_id=%d PUBACK!\n", i);
break;
}
case PUBLISH://收到发布的消息
{
pClient->keep_time=drv_get_sec_counter();//更新时间
printf("sock_id=%d PUBLISH!\n", i);
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, make_buff, make_size);
char *pTopic=receivedTopic.lenstring.data;
if(g_sMqttWork.mqtt_recv_parse!=NULL)
{
char topic[30]={0};
int len=(char*)payload_in-pTopic;//topic 长度
if(len>sizeof(topic))
{
len=sizeof(topic)-1;
}
memcpy(topic, pTopic, len);
g_sMqttWork.mqtt_recv_parse(i, topic, payload_in, payloadlen_in);//应用层数据解析
}
break;
}
case SUBACK://订阅回复
{
// debug("sock_id=%d SUBACK!\n", i);
// printf_hex("sub buff=", make_buff, 30);
int count, requestedQoSs[1];
MQTTDeserialize_suback(&msgid, 1, &count, requestedQoSs, make_buff, make_size);
// debug("$$$ msgid=0x%04X\n", msgid);
for(u8 k=0; k<MQTT_SUB_NUM; k++)
{
SubPackStruct *pSub=&pClient->sub_list[k];
if(pSub->base_msg_id==msgid)
{
printf("topic=%s sub ok!\n", pSub->sub_topic);
pSub->curr_state=TopicStateSub;
// pSub->subed_time=drv_get_sec_counter();
}
}
break;
}
case UNSUBACK://取消订阅回复
{
// debug("sock_id=%d UNSUBACK!\n", i);
MQTTDeserialize_unsuback(&msgid, make_buff, make_size);
// debug("$$$ msgid=0x%04X\n", msgid);
for(u8 k=0; k<MQTT_SUB_NUM; k++)
{
SubPackStruct *pSub=&pClient->sub_list[k];
if(pSub->base_msg_id==msgid)
{
printf("topic=%s unsub ok!\n", pSub->sub_topic);
pSub->curr_state=TopicStateUnSub;
// pSub->subed_time=drv_get_sec_counter();
}
}
break;
}
case PINGRESP://ping回复
{
pClient->keep_time=drv_get_sec_counter();//更新时间
// debug("sock_id=%d PINGRESP!\n", i);
break;
}
case DISCONNECT://断开连接
{
printf("mqtt_id=%d DISCONNECT!\n", i);
drv_mqtt_close(pClient);
break;
}
}
}
}
}
剩下的就是一些简单的功能了,比如设置话题、发布消息,关闭连接等等,较为简单。
/*
================================================================================
描述 : 设置话题信息
输入 :
输出 :
================================================================================
*/
void drv_mqtt_set_topic_info(u8 client_id, u8 sub_id, char *topic, u32 base_msg_id, u8 dst_state)
{
if(client_id<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[client_id];
if(sub_id<MQTT_SUB_NUM)
{
SubPackStruct *pSub=&pClient->sub_list[sub_id];
if(strlen(topic)<sizeof(pSub->sub_topic))
{
pSub->curr_state=TopicStateIdel;
pSub->dst_state=dst_state;
pSub->base_msg_id=base_msg_id;
strcpy(pSub->sub_topic, topic);
}
}
}
}
/*
================================================================================
描述 : 设置话题订阅状态
输入 :
输出 :
================================================================================
*/
void drv_mqtt_set_topic_state(u8 client_id, u8 sub_id, u8 dst_state)
{
if(client_id<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[client_id];
if(sub_id<MQTT_SUB_NUM)
{
SubPackStruct *pSub=&pClient->sub_list[sub_id];
pSub->dst_state=dst_state;
}
}
}
/*
================================================================================
描述 : MQTT发布数据
输入 :
输出 :
================================================================================
*/
void drv_mqtt_publish(u8 index, u8 *msg_buff, u16 msg_len, char *topic)
{
static u8 make_buff[MQTT_PUB_BUFF_SIZE]={0};
static const int make_size=sizeof(make_buff);
u16 make_len=0;
if(index<MQTT_CONN_NUM)
{
MqttClientStruct *pClient=&g_sMqttWork.client_list[index];
if(pClient->is_connected==true)//已经连接
{
pClient->msg_id++;
MQTTString topicString = MQTTString_initializer;
topicString.cstring=topic;
make_len = MQTTSerialize_publish(make_buff, make_size, 0, 1,0, pClient->msg_id, topicString, msg_buff, msg_len);//组合发布报文
if(pClient->mqtt_send!=NULL && make_len>0)
{
int ret=pClient->mqtt_send(make_buff, make_len);//发送
}
}
}
}
/*
================================================================================
描述 : 关闭连接
输入 :
输出 :
================================================================================
*/
void drv_mqtt_close(MqttClientStruct *pClient)
{
pClient->is_connected=false;
for(u8 i=0; i<MQTT_SUB_NUM; i++)
{
SubPackStruct *pSub=&pClient->sub_list[i];
pSub->curr_state=TopicStateIdel;
// pSub->subed_time=0;
}
pClient->msg_id=0;
pClient->keep_time=0;
}
五、使用过程
应用层的使用主要就是根据要求配置信息,首先物理通讯接口先设置,这里使用esp8266的连接3作为网络链路,同时注册接收函数把数据缓存进ringbuffer;然后就是MQTT用户名、密码、客户端ID的设置了;接下来有三个回调函数注册,两个是物理层的MQTT收发,还有一个是应用层的数据解析,这里已经来到了最后的净化器项目本身了,由此可以看出,要想代码好维护,写代码之前就要分层设计,这样出问题了才好分级排查,再后期自己阅读时逻辑也更走得通;最后一步就是话题订阅了,这样才能收到用户的控制数据,每个设备订阅话题都不一样,最后都带上了自己序列号,这样用户端才能针对性控制设备。
下面代码是净化器应用层的数据解析。
/*
================================================================================
描述 : 设备解析服务器下发的数据
输入 :
输出 :
================================================================================
*/
void app_air_recv_parse(u8 *buff, u16 len)
{
u8 head[2]={0xAA, 0x55};
u8 *pData=memstr(buff, len, head, 2);
if(pData!=NULL)
{
u16 total_len=pData[2]<<8 | pData[3];
u16 crcValue=pData[total_len]<<8 | pData[total_len+1];
if(crcValue==drv_crc16(pData, total_len))
{
pData+=4;
u32 device_sn=pData[0]<<24|pData[1]<<16|pData[2]<<8|pData[3];
pData+=4;
if(device_sn!=g_sAirWork.device_sn)//识别码确认
return;
u8 cmd_type=pData[0];
pData++;
switch(cmd_type)
{
case AIR_CMD_HEART://心跳包
{
break;
}
case AIR_CMD_DATA://数据包
{
break;
}
case AIR_CMD_SET_SPEED://设置风速
{
u8 speed=pData[0];
pData+=1;
app_motor_set_speed(speed);
break;
}
case AIR_CMD_SET_SWITCH://设置开关
{
u8 state=pData[0];
pData+=1;
g_sAirWork.switch_state=state;
if(state>0)
{
app_motor_set_speed(100);//启动风扇
}
else
{
app_motor_set_speed(0);//停止风扇
}
app_air_send_status();
break;
}
}
}
}
}
六、总结
MQTT协议本身较为繁琐,现在应用阶段暂时不用太深入,先学会使用就行,用熟了再去查阅文档,这样理解起来更透彻。mqtt的驱动设计相较于其他驱动文件更为复杂,因为它所牵涉的内容更广,有开源库、网络链路、应用层参数配置等等,完整的工程在第二篇文章里有的下载,自行查阅。
本项目的交流QQ群:701889554
写于2024-4-1