前言
上一章我们用开发板通过SNTP协议获取网络协议,本章我们介绍一下开发板通过配置MQTT连接到服务器上,并且订阅和发布消息。
什么是MQTT?
MQTT是一种轻量级的消息传输协议,旨在在物联网(IoT)应用中实现设备间的可靠通信。它使用发布-订阅模式,其中包括一个MQTT服务端(代理或服务器)和多个MQTT客户端之间的通信。MQTT协议具有以下特点:
- 轻量级:MQTT协议设计简单,协议头部开销小,适用于资源受限的设备和网络。
- 低带宽消耗:MQTT采用二进制编码,有效地利用网络带宽。
- 异步通信:客户端可以随时发布和订阅消息,无需等待对方的响应。
- 发布-订阅模式:消息发布者将消息发布到特定的主题,而订阅者则订阅感兴趣的主题。这种模式支持松耦合的通信和灵活的消息传递。
报文介绍
报文格式
MQTT控制报文由三部分组成,分别是固定报头,可变报头,有效载荷。
固定报头
固定报头最少由两个字节组成,第一个字节的7-4位为协议类型,3-0位为标志位,从第二个字节开始为剩余长度(包括可变报头和有效载荷的长度)
协议类型具体定义可参考下表:
标志位可以参考下表:
其中:
DUP1 = 控制报文的重复分发标志
QoS2 = PUBLISH 报文的服务质量等级
RETAIN3 = PUBLISH 报文的保留标志
协议类型示例如下表:
剩余长度字段最多四个字节,最少一个字节,具体长度如下表所示:
其中,每个字节的6-0位用于编码数据,第7位是标志位,为1则表示下一个字节也是剩余长度字段。
可变报头
某些控制报文包含可变报头,它在固定报头(Fixed header)和有效载荷(Payload)之间。每个协议的可变报头都不一样。
其中大多数协议都会有的字段是报文标识符。
可变报头在各个控制报文的详细内容中再展开讲解。
有效载荷
有效载荷是除控制报文格式以外的有效信息,CONNECT、PUBLISH、SUBSCRIBE等需要传递有效信息的协议帧都需要。
实例讲解
MQTT报文的具体格式可以参考文档:MQTT Version 3.1.1 (oasis-open.org)
连接MQTT服务器(客户端->服务器)
(以下皆为HEX格式)
//固定报头
10 21(剩余33个字节)
//可变报头
00 04 4D 51 54 54 04 C2 00 3C
//clientid,长度8字节,文本内容为clientid
00 08 63 6C 69 65 6E 74 69 64
//用户名,长度4字节,本文内容为MQTT
00 04 4D 51 54 54
//密码,长度5字节,本文内容为w5500
00 05 77 35 35 30 30
确认连接(服务器->客户端)
//连接成功,会话为新会话
20 02 00 00
订阅主题(客户端->服务器)
//固定报头,剩余长度10字节
82 0A
//可变报头
00 01
//有效载荷(长度5字节,内容为topic,qos为0)
00 05 74 6F 70 39 63 00
确认订阅(服务器->客户端)
//固定报头,剩余长度3字节
90 03
//可变报头
00 01
//有效载荷,回复订阅qos为0
00
发布消息(qos0)
//固定报头,qos0消息,非重传,非保留,剩余长度10字节
30 10
//可变报头,5个字节的主题“topic”,报文标识符1
00 05 74 6F 70 69 63 00 01
//有效载荷“message”
6D 65 73 73 61 67 65
连接方式
开发板和主机都接在路由器LAN口
连接MQTTX服务器测试
相关代码
我们打开例程中的mqtt_client.c文件,首先可以看到,我们定义了MQTT协议的收发报文缓存和MQTT所使用的socket号
#define MQTT_SEND_BUFF_SIZE 2048 // MQTT协议发送报文缓存大小
#define MQTT_RECV_BUFF_SIZE 2048 // MQTT协议接收报文缓存大小
#define MQTT_SOCKET 1 // MQTT使用的SOCKET号
uint8_t mqtt_send_buff[MQTT_SEND_BUFF_SIZE] = {0}; // MQTT协议发送报文缓存
uint8_t mqtt_recv_buff[MQTT_RECV_BUFF_SIZE] = {0}; // MQTT协议接收报文缓存
然后定义一个结构体来存放连接参数和订阅发布主题参数
// MQTT连接和订阅参数结构体
typedef struct MQTTCONNECTION
{
uint8_t server_ip[4];
int port;
char clientid[1024];
char username[1024];
char passwd[1024];
char pubtopic[255];
char subtopic[255];
int QOS;
} mqttconn;
// MQTT连接和订阅参数
mqttconn mqtt_params = {
.server_ip = {54, 244, 173, 190},
.port = 1883,
.clientid = "9a1d7719a8ac40d29311f26c5c5469dc",
.username = "mqtt_username",
.passwd = "123456",
.pubtopic = "W5500",
.subtopic = "W5500",
.QOS = 0,
};
网络地址参数如下
static wiz_NetInfo net_info = {
.mac = {0x00, 0x08, 0xdc, 0x16, 0xed, 0x2e},
.ip = {192, 168, 1, 20},
.sn = {255, 255, 255, 0},
.gw = {192, 168, 1, 1},
.dns = {8, 8, 8, 8},
.dhcp = NETINFO_STATIC};
并定义了三个全局变量用来存放连接MQTT的信息
MQTTClient c = {0}; // MQTT客户端连接信息结构体
Network n = {0}; // 网络信息结构体
int connOK; //连接状态
此外,还需定义四个函数
首先是一个1ms的循环定时器回调函数,在这个回调函数中,我们必须把mqtt_interface.c库文件中的MilliTimer_Handler()函数加入到我们的1ms定时器回调函数中。
bool repeating_timer_callback(struct repeating_timer *t)
{
MilliTimer_Handler();
return true;
}
其次是mqtt初始化函数,在这个函数中,我们连接并且订阅主题,最后发布一条消息上去。
void mqtt_init(void)
{
int ret;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
NewNetwork(&n, MQTT_SOCKET);
ConnectNetwork(&n, mqtt_params.server_ip, 1883);
MQTTClientInit(&c, &n, 1000, mqtt_send_buff, MQTT_SEND_BUFF_SIZE, mqtt_recv_buff, MQTT_RECV_BUFF_SIZE);
data.willFlag = 0;
data.MQTTVersion = 3;
data.clientID.cstring = mqtt_params.clientid;
data.username.cstring = mqtt_params.username;
data.password.cstring = mqtt_params.passwd;
data.keepAliveInterval = 30;
data.cleansession = 1;
// 连接mqtt服务器,如果连接失败则继续重连
connOK = MQTTConnect(&c, &data);
printf("Connected:%s\r\n", connOK == 0 ? "success" : "failed");
while (connOK)
{
sleep_ms(50);
connOK = MQTTConnect(&c, &data);
printf("Connected:%s\r\n", connOK == 0 ? "success" : "failed");
}
// 订阅主题,如果订阅失败则继续订阅
ret = MQTTSubscribe(&c, mqtt_params.subtopic, mqtt_params.QOS, messageArrived);
printf("Subscribing to %s\r\n", mqtt_params.subtopic);
printf("Subscribed:%s\r\n", ret == 0 ? "success" : "failed");
while (ret)
{
sleep_ms(50);
ret = MQTTSubscribe(&c, mqtt_params.subtopic, mqtt_params.QOS, messageArrived);
printf("Subscribing to %s\r\n", mqtt_params.subtopic);
printf("Subscribed:%s\r\n", ret == 0 ? "success" : "failed");
}
sleep_ms(50);
// 发布消息
MQTTMessage pubmessage = {
.qos = QOS0,
.retained = 0,
.dup = 0,
.id = 0,
};
pubmessage.payload = "hello mqtt!";
pubmessage.payloadlen = strlen(pubmessage.payload);
MQTTPublish(&c, mqtt_params.pubtopic, &pubmessage);
printf("TX:%s\r\n", pubmessage.payload);
}
然后就是消息回调函数,服务器下发的消息都会进入该函数中进行处理。
void messageArrived(MessageData *md)
{
unsigned char messagebuffer[512];
MQTTMessage *message = md->message;
if (0)//展示qos等级
{
memcpy(messagebuffer, (char *)message->payload, (int)message->payloadlen);
*(messagebuffer + (int)message->payloadlen + 1) = '\n';
printf("%s\r\n", messagebuffer);
}
if (0)//展示qos等级
printf("%.*s", (int)message->payloadlen, (char *)message->payload);
else
printf("%s%.*s%s%s", "Rx:", (int)message->payloadlen, (char *)message->payload, mqtt_params.QOS, "\r\n");
}
最后就是mqtt保活函数,该函数需要放在主函数的主循环中,否则可能导致保活失败
void keep_mqtt(void)
{
if (MQTTYield(&c, 30))
{
mqtt_init();
}
}
在主函数中,我们只需要初始化网络信息和接口,然后开启1ms循环定时器,最后初始化mqtt,然后把mqtt保活函数放入主循环中即可。
int main()
{
struct repeating_timer timer;
stdio_init_all();
sleep_ms(3000);
printf("W5500 mqtt example.\r\n");
wizchip_initialize(); // SPI初始化以及链路状态检测
wizchip_setnetinfo(&net_info); // 设置网络地址信息
print_network_information(net_info); // 打印网络地址信息
add_repeating_timer_ms(1, repeating_timer_callback, NULL, &timer); // 开启1ms循环定时器
mqtt_init(); // mqtt连接函数
while (true)
{
keep_mqtt(); // mqtt保活
}
}
测试效果
将程序编译烧录后,打开串行监视器,可以看到,成功连接并且订阅上主题,还发布了一条信息。
在MQTTX上我们也能收到开发板发布的消息,我们在MQTTX发布一条消息出去。开发板也同样能收到。
相关连接
本章例程链接:mqtt_client example