前言
上一章我们用W5100S_EVB_PICO 开发板做Ping测试,那么本章我们进行W5100S_EVB_PICO MQTT的测试。
什么是mqtt?
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
连接方式
使开发板和我们的电脑处于同一网段:
- 开发板(设备)通过网线直连主机(PC)
- 开发板和主机都接在路由器LAN口
测试工具
- MQTTX调试工具
MQTT主要控制报文
固定报头 Fixed header
每个MQTT控制报文都包含一个固定报头。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
byte 1 | MQTT控制报文的类型 | 用于指定控制报文类型的标志位 | ||||||
byte 2... | 剩余长度 |
MQTT控制报文的类型 MQTT Control Packet type
位置:第1个字节,二进制位7-4
名字 | 值 | 报文流动方向 | 描述 |
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | 禁止 | 保留 |
标志 Flags
固定报头第1个字节的剩余的4位 [3-0]包含每个MQTT控制报文类型特定的标志,如果收到非法的标志,接收者必须关闭网络连接。
控制报文 | 固定报头标志 | Bit 3 | Bit 2 | Bit 1 | Bit 0 |
CONNECT | Reserved | 0 | 0 | 0 | 0 |
CONNACK | Reserved | 0 | 0 | 0 | 0 |
PUBLISH | Used in MQTT 3.1.1 | DUP1 | QoS2 | QoS2 | RETAIN3 |
PUBACK | Reserved | 0 | 0 | 0 | 0 |
PUBREC | Reserved | 0 | 0 | 0 | 0 |
PUBREL | Reserved | 0 | 0 | 1 | 0 |
PUBCOMP | Reserved | 0 | 0 | 0 | 0 |
SUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
SUBACK | Reserved | 0 | 0 | 0 | 0 |
UNSUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
UNSUBACK | Reserved | 0 | 0 | 0 | 0 |
PINGREQ | Reserved | 0 | 0 | 0 | 0 |
PINGRESP | Reserved | 0 | 0 | 0 | 0 |
DISCONNECT | Reserved | 0 | 0 | 0 | 0 |
- DUP1 =控制报文的重复分发标志
- QoS2 = PUBLISH报文的服务质量等级
- RETAIN3 = PUBLISH报文的保留标志
剩余长度 Remaining Length(包括可变报头和负载的数据的长度)
使用变成编码(1到4个字节表示,即最大可表示256M,每个字节可编码128个数值+1个延续位(最高位是延续位表示是否有更多字节,低7位表示128个数值)):
字节数 | 最小值 | 最大值 |
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
可变报头 Variable header
可变报头的内容根据 控制报文类型 的不同而不同。(特别注意:部分控制报文需要 报文标识符字段)
控制报文 | 报文标识符字段 |
CONNECT | 不需要 |
CONNACK | 不需要 |
PUBLISH | 需要(如果QoS > 0) |
PUBACK | 需要 |
PUBREC | 需要 |
PUBREL | 需要 |
PUBCOMP | 需要 |
SUBSCRIBE | 需要 |
SUBACK | 需要 |
UNSUBSCRIBE | 需要 |
UNSUBACK | 需要 |
PINGREQ | 不需要 |
PINGRESP | 不需要 |
DISCONNECT | 不需要 |
PUBACK, PUBREC, PUBREL报文必须包含与最初发送的PUBLISH报文相同的报文标识符,以此来标识是同一条信息如果一个客户端要重发这个特殊的控制报文,在随后重发那个报文时,它必须使用相同的标识符。
当客户端处理完这个报文对应的确认后,这个报文标识符就释放可重用。QoS 1的PUBLISH对应的是PUBACK,QoS 2的PUBLISH对应的是PUBCOMP
有效载荷 Payload
有效载荷是除控制报文格式以外的有效信息,CONNECT、PUBLISH、SUBSCRIBE等需要传递有效信息的协议帧都需要。
控制报文 | 有效载荷 |
CONNECT | 需要 |
CONNACK | 不需要 |
PUBLISH | 可选 |
PUBACK | 不需要 |
PUBREC | 不需要 |
PUBREL | 不需要 |
PUBCOMP | 不需要 |
SUBSCRIBE | 需要 |
SUBACK | 需要 |
UNSUBSCRIBE | 需要 |
UNSUBACK | 不需要 |
PINGREQ | 不需要 |
PINGRESP | 不需要 |
DISCONNECT | 不需要 |
官方协议文档:docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf
测试
1.相关代码
我们打开例程中库文件的mqttx_client.c文件用到如下所示几个函数:
mqtt_init是对mqtt的信息进行初始化,把配置信息填入;messageArrived函数主要作用是讲发布和订阅的信息进行判断打印,keep_alive函数是一个心跳包,如果超过设定值没有发送心跳包就进行mqtt_init函数初始化。
void mqtt_init(void)
{
int ret;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
NewNetwork(&n, 1);
ConnectNetwork(&n, mqtt_params.server_ip, 1883);
MQTTClientInit(&c, &n, 1000, mqtt_send_buff, MQTT_SEND_BUFF_SIZE, mqtt_recv_buff, MQTT_RECV_BUFF_SIZE);
data.willFlag = 0;
data.MQTTVersion = 3;
data.clientID.cstring = mqtt_params.clientid;
data.username.cstring = mqtt_params.username;
data.password.cstring = mqtt_params.passwd;
data.keepAliveInterval = 30;
data.cleansession = 1;
connOK = MQTTConnect(&c, &data);
printf("Connected:%s\r\n", connOK == 0 ? "success" : "failed");
printf("Subscribing to %s\r\n", mqtt_params.subtopic1);
ret = MQTTSubscribe(&c, mqtt_params.subtopic1, mqtt_params.QOS, messageArrived);
printf("Subscribed:%s\r\n", ret == 0 ? "success" : "failed");
sleep_ms(300);
MQTTMessage pubmessage={
.qos=QOS2,
.dup=0,
.retained=0,
.id=0,
};
pubmessage.payload="hello mqtt!\r\n";
pubmessage.payloadlen=strlen(pubmessage.payload);
MQTTPublish(&c,mqtt_params.pubtopic,&pubmessage);
}
void messageArrived(MessageData* md)
{
unsigned char messagebuffer[512];
MQTTMessage* message = md->message;
if (mqtt_params.QOS)
{
memcpy(messagebuffer,(char*)message->payload,(int)message->payloadlen);
*(messagebuffer + (int)message->payloadlen + 1) = '\n';
printf("%s\r\n",messagebuffer);
}
if (mqtt_params.QOS)
printf("%.*s", (int)message->payloadlen, (char*)message->payload);
else
printf("%s%.*s%s%s", "RX:",(int)message->payloadlen, (char*)message->payload, mqtt_params.QOS,"\r\n");
}
void keep_alive(void)
{
if (!connOK)
{
if (MQTTYield(&c, 30))
{
mqtt_init();
}
}
}
- 网络配置信息和之前的一样,还多加了mqtt的连接参数,其中的参数主要包括服务器ip,端口号,客户端id,用户名和密码(由于是本地测试,可以不需要设置用户名和密码)然后就是发布和订阅消息(必须和服务器上的一致)然后是QOS等级设置。
#define SOCKET_ID 0
#define ETHERNET_BUF_MAX_SIZE (1024 * 2)
#define MQTT_SEND_BUFF_SIZE 2048
#define MQTT_RECV_BUFF_SIZE 2048
uint8_t mqtt_send_buff[MQTT_SEND_BUFF_SIZE] = {0};
uint8_t mqtt_recv_buff[MQTT_RECV_BUFF_SIZE] = {0};
typedef struct MQTTCONNECTION
{
char mqttHostUrl[1024];
int port;
char clientid[1024];
char username[1024];
char passwd[1024];
uint8_t server_ip[4];
char pubtopic[255];
char subtopic1[255];
int QOS;
} mqttconn;
mqttconn mqtt_params = {
.server_ip = {54,244,173,190},
.port = 1883,
.clientid = "9a1d7719a8ac40d29311f26c5c5469dc",
.username = "mqtt_username",
.passwd = "123456",
.pubtopic = "1234",
.subtopic1 = "2345",
.QOS = 0,
};
unsigned char *data_ptr = NULL;
void network_init(void);
wiz_NetInfo net_info = {
.mac = {0x00, 0x08, 0xdc, 0x16, 0xed, 0x2e},
.ip = {192, 168, 124, 10},
.sn = {255, 255, 255, 0},
.gw = {192, 168, 124, 1},
.dns = {8, 8, 8, 8},
.dhcp = NETINFO_STATIC};
MQTTClient c = {0};
Network n = {0};
int connOK;
bool repeating_timer_callback(struct repeating_timer *t);
void mqtt_init(void);
void messageArrived(MessageData *md);
void keep_alive(void);
wiz_NetInfo get_info;
static uint8_t ethernet_buf[ETHERNET_BUF_MAX_SIZE] = {0,};
static uint8_t destip[4]={192, 168, 124, 1};
static uint16_t destport = 8080;
static uint16_t local_port =8000;
int main()
{
struct repeating_timer timer;
stdio_init_all();
sleep_ms(2000);
wizchip_initialize();
wizchip_setnetinfo(&net_info);
print_network_information(net_info);
add_repeating_timer_ms(1, repeating_timer_callback, NULL, &timer);
mqtt_init();
while(true)
{
// loopback_udpc(SOCKET_ID, ethernet_buf, destip, destport);
keep_alive();
sleep_ms(10);
}
}
bool repeating_timer_callback(struct repeating_timer *t)
{
MilliTimer_Handler();
return true;
}
- 打开MQTTX工具信息配置好mqtt的信息(代码的信息是根据这里的进行填写)
- 订阅和发布(对应代码上的订阅与发布信息)
2.测试现象
- 我们可以看到串口打印信息中打印了网络连接上且配置好网络信息,最后连接上了mqttx上的mqtt公共服务器
- 然后我们打开mqttx可以到由W5100S_EVB_PICO发来的“hello mqtt”,表示订阅成功。
- 然后用mqttx切换到发布消息,然后将信息发送给W5100S_EVB_PICO
- 最后可以看到串口打印了mqttx下发的消息,表示发布成功。
相关链接:
本章例程链接:mqtt_cliten example