MQTT背景应用
MQTT是机器对机器(M2M)/物联网(IoT)连接协议,英文全名为“Message Queuing Telemetry Transport”,“消息队列遥测传输”协议。它是专为受限设备和低带宽、高延迟或不可靠的网络而设计的,是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通信协议,该协议构建于TCP/IP协议之上,由IBM在1999年发布。
名词释义:
- Publisher:发布者
- Broker:代理(服务端)
- Subscriber:订阅者
- Topic:发布/订阅的主题
流程概述:上图中,各类传感器的角色是发布者(Publisher)。譬如,湿度传感器和温度传感器分别向接入的MQTT Broker中(周期性)发布两个主题名为“Moisture”(湿度)和“Temp”(温度)的主题;伴随着这两个主题共同发布的,还有湿度值和温度值,被称为“消息”。几个客户端的角色是订阅者SubScriber,如手机APP从Broker订阅了“Temp”主题,便能在手机上获取到温度传感器Publish在Broker中的温度值。
发布者和订阅者的角色并非是固定的,而是相对的。
发布者可以同时从Broker订阅主题,同理,订阅者也可以向Broker发布主题;即发布者可以是订阅者,订阅者也可以是发布者。
Broker可以是在线的云服务器,也可以是本地搭建的局域网客户端。
按照需求,实际上Broker自身也会保护一些订阅/发布主题的功能。
MQTT报文结构
任何通用/私有协议都是由事先规定好的、按某种规则约束的各种报文数据包组成的,MQTT也不例外。
在MQTT协议中,所有的数据包都是由最多三部分组成:固定Header+可变Header+有效载荷
固定Header是必需的,可变Header和有效载荷是非必需的。
因此,理论上来说,MQTT协议数据包的最小长度为2个字节,造就了它身边占用的额外资源消耗最小化特色。
Fixed header
固定Header由至少两个字节组成,如表2-2所示。
第一个字节的高4位描述了当前数据报文的类型,低四位定义了与报文类型相关的标志位。
第二个及之后的至多4个字节代表着剩余数据的字节长度。
Remaining Length剩余长度表示当前报文剩余部分的字节数,包括可变header和有效载荷的数据。
uMQTT的实现
uMQTT软件包是RT-Thread自主研发的,基于MQTT3.1.1协议的客户端实现,它提供了设备与MQTT Broker通讯的基本功能。
uMQTT软件包功能如下:
- 实现基础的连接、订阅、发布功能;
- 具备多重心跳保活,设备重连机制,保证mqtt在线状态,适应复杂情况;
- 支持Qos=0,QoS=1,QoS=2三种发送信息质量;
- 支持多客户端使用;
- 用户端接口简便,留有多种对外回调函数;
- 支持多种技术参数可配置,易上手,便于产品化开发;
- 功能强大,资源占用率低,支持功能可裁剪。
uMQTT的结构框架
uMQTT软件包主要用于在嵌入式设备上实现MQTT协议,软件包的主要工作基于MQTT协议实现。
软件包实现过程中主要做了:
- 根据MQTT3.1.1协议规定,进行软件包数据协议的封包解包。
- 传输层函数适配对接SAL(Socket Abstraction Layer)层。
- uMQTT客户端层,根据协议包层和传输层编写符合应用层的接口。实现基础连接、断连、订阅、取消订阅、发布消息等功能。支持qoS0/1/2三种发送信息质量。利用uplink timer定时器,实现多重心跳保活机制和设备重连机制,增加设备在线稳定性,适应复杂情况。
uMQTT客户端
想要连接Broker,嵌入式设备需要作为MQTT协议中的客户端来使用。
在uMQTT组件的umqtt.h文件中,抽象出了初始化客户端用到的MQTT配置信息,组成对应的数据结构体。
struct umqtt_info
{
rt_size_t send_size,recv_size; //发送接收缓冲区大小
const char *uri; //完整的URI(包含URI+URN)
const char *client_id; //客户端ID
const char *lwt_topic; //遗嘱主题
const char *lwt_message; //遗嘱消息
const char *user_name; //用户名
const char *password; //密码
enum umqtt_qos lwt_qos; //遗嘱QoS
umqtt_subsribe_cb lwt_cb; //遗嘱回调函数
rt_uint8_t reconnect_max_num; //最大重连次数
rt_uint32_t reconnect_interval; //最大重连时间间隔
rt_uint8_t keepalive_max_num; /* 最大保活次数 */
rt_uint32_t keepalive_interval; /* 最大保活时间间隔 */
rt_uint32_t recv_time_ms; /* 接收超时时间 */
rt_uint32_t connect_time; /* 连接超时时间 */
rt_uint32_t send_timeout; /* 上行(发布/订阅/取消订阅)超时时间 */
rt_uint32_t thread_stack_size; /* 线程栈大小 */
rt_uint8_t thread_priority; /* 线程优先级 */
#ifdef PKG_UMQTT_TEST_SHORT_KEEPALIVE_TIME
rt_uint16_t connect_keepalive_sec; /* 连接信息,保活秒数 */
#endif
}
这些配置信息一般在创建uMQTT客户端之前需要自行填写指定,譬如Broker的“URI”、“用户名”或“密码”之类的关键信息。
其它的非关键信息,如果没有指定,那么会在创建客户端函数umqtt_create中,调用umqtt_check_def_info函数来赋值为默认值。
static void umqtt_check_def_info(struct umqtt_info *info)
{
if(info)
{
if (info->send_size == 0) { info->send_size = PKG_UMQTT_INFO_DEF_SENDSIZE; }
if (info->recv_size == 0) { info->recv_size = PKG_UMQTT_INFO_DEF_RECVSIZE; }
if (info->reconnect_max_num == 0) { info->reconnect_max_num = PKG_UMQTT_INFO_DEF_RECONNECT_MAX_NUM; }
if (info->reconnect_interval == 0) { info->reconnect_interval = PKG_UMQTT_INFO_DEF_RECONNECT_INTERVAL; }
if (info->keepalive_max_num == 0) { info->keepalive_max_num = PKG_UMQTT_INFO_DEF_KEEPALIVE_MAX_NUM; }
if (info->keepalive_interval == 0) { info->keepalive_interval = PKG_UMQTT_INFO_DEF_HEARTBEAT_INTERVAL; }
if (info->connect_time == 0) { info->connect_time = PKG_UMQTT_INFO_DEF_CONNECT_TIMEOUT; }
if (info->recv_time_ms == 0) { info->recv_time_ms = PKG_UMQTT_INFO_DEF_RECV_TIMEOUT_MS; }
if (info->send_timeout == 0) { info->send_timeout = PKG_UMQTT_INFO_DEF_SEND_TIMEOUT; }
if (info->thread_stack_size == 0) { info->thread_stack_size = PKG_UMQTT_INFO_DEF_THREAD_STACK_SIZE; }
if (info->thread_priority == 0) { info->thread_priority = PKG_UMQTT_INFO_DEF_THREAD_PRIORITY; }
}
}
然而只有上述信息,是无法运行起来一个MQTT客户端的。
故在umqtt.c中,含有umqtt_info的umqtt_client结构体列出了初始化客户端用到的所有数据:
struct umqtt_client
{
int sock; /* 套接字 */
enum umqtt_client_state connect_state; /* mqtt客户端状态 */
struct umqtt_info mqtt_info; /* mqtt用户配置信息 */
rt_uint8_t reconnect_count; /* mqtt客户端重连计数 */
rt_uint8_t keepalive_count; /* mqtt保活计数 */
rt_uint32_t pingreq_last_tick; /* mqtt的PING请求上一次滴答值 */
rt_uint32_t uplink_next_tick; /* 上行连接的下一次滴答值 */
rt_uint32_t uplink_last_tick; /* 上行连接的上一次滴答值 */
rt_uint32_t reconnect_next_tick; /* 客户端断开重连时的下一次滴答值 */
rt_uint32_t reconnect_last_tick; /* 客户端断开重连时的上一次滴答值 */
rt_uint8_t *send_buf, *recv_buf; /* 收发缓冲区指针 */
rt_size_t send_len, recv_len; /* 收发数据的长度 */
rt_uint16_t packet_id; /* mqtt报文标识符 */
rt_mutex_t lock_client; /* mqtt客户端互斥锁 */
rt_mq_t msg_queue; /* mqtt客户端消息队列 */
rt_timer_t uplink_timer; /* mqtt保活重连定时器 */
int sub_recv_list_len; /* 接收订阅信息的链表长度 */
rt_list_t sub_recv_list; /* 订阅消息的链表头 */
rt_list_t qos2_msg_list; /* QoS2的消息链表 */
struct umqtt_pubrec_msg pubrec_msg[PKG_UMQTT_QOS2_QUE_MAX]; /* 发布收到消息数组(QoS=2) */
umqtt_user_callback user_handler; /* 用户句柄 */
void *user_data; /* 用户数据 */
rt_thread_t task_handle; /* umqtt任务线程 */
rt_list_t list; /* umqtt链表头 */
};
部分成员的结构体和枚举类型定义,可自行在umqtt.h文件中查看。
该结构体会在创建客户端函数umqtt_create中,调用umqtt_check_def_info函数之后初始化:
- 初始化遗嘱数据结构(如果有的话)
- 为收发缓冲区申请内存
- 创建互斥锁、消息队列和超时重连定时器(超时回调实现重连+保活)
- 初始化各链表
- 创建umqtt_thread–mqtt数据收发线程
- 返回mqtt_client结构体
当第6步返回的值不为空时,即可调用umqtt_start函数来通过LWIP发送CONNECT报文连接Broker,连接成功后便会启动umqtt_thread线程,开启MQTT的通信。
LWIP
LWIP(轻量级IP)是一种开源、轻量且高效的Internet协议(IP)套件实现。
它主要设计用于嵌入式系统,并经常用于具有有限资源的小型到中型设备,如微控制器、物联网设备和实时操作系统。
LWIP是用C编程语言编写的,旨在为资源受限的环境提供网络功能。
LWIP的主要特点和特性包括:
- 小巧的占用空间:LWIP旨在具有最小的内存和代码大小占用,适用于资源受限的设备。
- 支持常见网络协议:LWIP支持标准的网络协议,如IPv4、IPv6、TCP(传输控制协议)、UDP(用户数据报协议)、ICMP(Internet控制消息协议)等。
- 可移植性:LWIP非常可移植,可以轻松适应各种嵌入式平台和操作系统。
- 与实时操作系统(RTOS)集成:LWIP通常与FreeRTOS等RTOS系统一起使用,适用于实时和多任务应用。
- BSD套接字API,使熟悉标准套接字编程的开发人员能够更轻松地使用该库。
- 可扩展性:LWIP设计为可扩展性,允许开发人员根据需要为其特定应用程序添加自定义功能或协议。
- 开源:LWIP以开源许可证分发(通常是MIT许可证),允许免费使用。
uMQTT与LWIP
在umqtt_start函数中,首先会将uMQTT客户端的状态置为UMQTT_CS_LINKING,表示正在连接中,接下来会调用umqtt_connect函数,将本地客户端连接到Broker。
连接到Broker的过程分两步:
- 创建套接字,与Broker建立链路连接。
- 发送CONNECT报文,创建MQTT协议连接。
在umqtt_connect函数中,通过调用umqtt_trans_connect函数,来完成第一步:
int umqtt_trans_connect(const char *uri, int *sock)
{
int _ret = 0;
struct addrinfo *addr_res = RT_NULL;
*sock = -1;
// 域名解析
_ret = umqtt_resolve_uri(uri, &addr_res);
if((_ret < 0) || (addr_res == RT_NULL))
{
LOD_E("resolve uri err");
_ret = UMQTT_FAILED;
goto exit;
}
//创建套接字
if((*sock = socket(addr_res->ai_family, SOCK_STREAM, UMQTT_SOCKET_PROTOCOL))< 0 )
{
LOG_E("create socket error!");
_ret = UMQTT_FAILED;
goto exit;
}
//设置套接字工作在非阻塞模式下
_ret = ioctlsocket(*sock, FIONBIO, 0);
if (_ret < 0)
{
LOG_E(" iocontrol socket error!");
_ret = UMQTT_FAILED;
goto exit;
}
// 建立连接
if( (_ret = connect(*sock, addr_res->ai_addr, addr_res->ai_addrlen)) < 0)
{
LOG_E(" connect err!");
closesocket(*sock);
*sock = -1;
_ret = UMQTT_FAILED;
goto exit;
}
exit:
if (addr_res) {
freeaddrinfo(addr_res);
addr_res = RT_NULL;
}
return _ret;
}
这个函数,就是uMQTT通过LWIP与Broker建立连接的核心函数。
该函数是通过SAL即套接字抽象层组件,来调用相关接口访问LWIP的。
用到的部分SAL组件封装的函数(getaddrinfo是在umqtt_resolve_uri函数中用来解析域名的)
int getaddrinfo(const char *nodename,
const char *servname,
const struct addrinfo *hints,
struct addrinfo **res)
{
return sal_getaddrinfo(nodename, servname, hints, res);
}
---------------------------------------------------------------------------------------------
#define connect(s, name, namelen) sal_connect(s, name, namelen)
#define recvfrom(s, mem, len, flags, from, fromlen) sal_recvfrom(s, mem, len, flags, from, fromlen)
#define send(s, dataptr, size, flags) sal_sendto(s, dataptr, size, flags, NULL, NULL)
#define socket(domain, type, protocol) sal_socket(domain, type, protocol)
#define closesocket(s) sal_closesocket(s)
#define ioctlsocket(s, cmd, arg) sal_ioctlsocket(s, cmd, arg)
uMQTT发送组包
当uMQTT客户端与Broker成功建立链路层连接后,就会立刻发送CONNECT报文,建立MQTT的协议层连接。
uMQTT组件使用了巧妙的结构体+共用体来管理所有的收发报文:
union umqtt_pkgs_msg /* mqtt message packet type */
{
struct umqtt_pkgs_connect connect; /* connect */
struct umqtt_pkgs_connack connack; /* connack */
struct umqtt_pkgs_publish publish; /* publish */
struct umqtt_pkgs_puback puback; /* puback */
struct umqtt_pkgs_pubrec pubrec; /* publish receive (QoS 2, step_1st) */
struct umqtt_pkgs_pubrel pubrel; /* publish release (QoS 2, step_2nd) */
struct umqtt_pkgs_pubcomp pubcomp; /* publish complete (QoS 2, step_3rd) */
struct umqtt_pkgs_subscribe subscribe; /* subscribe topic */
struct umqtt_pkgs_suback suback; /* subscribe ack */
struct umqtt_pkgs_unsubscribe unsubscribe; /* unsubscribe topic */
struct umqtt_pkgs_unsuback unsuback; /* unsubscribe ack */
};
struct umqtt_msg
{
union umqtt_pkgs_fix_header header; /* fix header */
rt_uint32_t msg_len; /* message length */
union umqtt_pkgs_msg msg; /* retain payload message */
};
union umqtt_pkgs_msg是一个联合体,包含了多种不同类型的MQTT消息包。
每个成员都对应一种MQTT消息类型,如connect、connack、publish等。
这个联合体的作用是可以容纳不同种类的MQTT消息包,但在任何给定时刻只能包含一个有效的消息包。使得在处理MQTT消息时,可以有效地共享内存空间,减小内存占用。
struct umqtt_msg包含了三个成员:
- header:是一个联合体’umqtt_pkgs_fix_header’,表示MQTT消息的固定头部,它包含了消息的控制标志和其他必要的元数据。
- msg_len:是一个32位整数,表示MQTT消息的长度。
- msg:是一个联合体,用于存储具体类型的MQTT消息包。通过header中的控制标志,可以确定在’msg’中使用哪个成员。
通过umqtt_encode函数来调用不同的组包函数,填充对应格式的结构体,然后发送到Broker服务端。
/**
* packaging the data according to the format
*
* @param type the input packaging type
* @param send_buf the output send buf, result of the package
* @param send_len the output send buffer length
* @param message the input message
*
* @return <=0: failed or other error
* >0: package data length
*/
int umqtt_encode(enum umqtt_type type, rt_uint8_t *send_buf, size_t send_len, struct umqtt_msg *message)
{
int _ret = 0;
switch (type)
{
case UMQTT_TYPE_CONNECT:
_ret = umqtt_connect_encode(send_buf, send_len, &(message->msg.connect));
break;
case UMQTT_TYPE_PUBLISH:
_ret = umqtt_publish_encode(send_buf, send_len, message->header.bits.dup, message->header.bits.qos, &(message->msg.publish));
break;
case UMQTT_TYPE_PUBACK:
_ret = umqtt_puback_encode(send_buf, send_len, message->msg.puback.packet_id);
break;
case UMQTT_TYPE_PUBREC:
// _ret = umqtt_pubrec_encode();
break;
case UMQTT_TYPE_PUBREL:
_ret = umqtt_pubrel_encode(send_buf, send_len, message->header.bits.dup, message->msg.pubrel.packet_id);
break;
case UMQTT_TYPE_PUBCOMP:
_ret = umqtt_pubcomp_encode(send_buf, send_len, message->msg.pubcomp.packet_id);
break;
case UMQTT_TYPE_SUBSCRIBE:
_ret = umqtt_subscribe_encode(send_buf, send_len, &(message->msg.subscribe));
break;
case UMQTT_TYPE_UNSUBSCRIBE:
_ret = umqtt_unsubscribe_encode(send_buf, send_len, &(message->msg.unsubscribe));
break;
case UMQTT_TYPE_PINGREQ:
_ret = umqtt_pingreq_encode(send_buf, send_len);
break;
case UMQTT_TYPE_DISCONNECT:
_ret = umqtt_disconnect_encode(send_buf, send_len);
break;
default:
break;
}
return _ret;
}
由于报文类型较多,接下来仅以“CONNECT”报文(可变header——“协议名称”、“协议等级”、“连接标志”、“保活间隔(秒),有效载荷——“客户端标识符”、“遗嘱主题”、“遗嘱消息”、“用户名”、“密码”)为例”),来简述uMQTT的组包过程:
1.填充MQTT客户端的默认配置信息
2.调用umqtt_encode -> umqtt_connect_encode编码函数组包:
该函数首先调用MQTTSerialize_connectLength计算可变header和有效载荷的长度,得到的len会被作为参数传递给umqtt_pkgs_len函数,它的作用是计算固定header中的剩余长度字段的字节数并加上固定header第一个字节长度即1,与buflen作比较,判断该包数据的有效性。
if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen)
希望得到的len长度就是固定header中的剩余长度值,从而方便后面的组包过程,而有效的报文长度buflen = len + 1+ 剩余长度字段的字节数。
组包过程完成之后,会调用umqtt_trans_send函数,通过LWIP将发送缓冲区数据发送到socket连接的Broker。
int umqtt_trans_send(int sock, const rt_uint8_t *send_buf, rt_uint32_t buf_len, int timeout)
{
int _ret = 0;
rt_uint32_t offset = 0U;
while(offset < buflen)
{
_ret = send(sock, send_buf + offset, buf_len - offset, 0);
if(_ret < 0)
return -errno;
offset += _ret;
}
return _ret;
}
uMQTT接收解包
当uMQTT将CONNECT报文发送完成后,就会调用umqtt_handle_readpacket函数(完成CONNECT过程后,该函数也会在umqtt_thread线程中被循环调用来接收数据)读取Broker的回复,对接收到的数据包进行解包处理:
- 读Fixed header的第一个字节,这里会调用umqtt_trans_recv函数读取socket数据,作用就是从对应的sock中读取buf_len长度的数据到recv_buf。
- 读Fixed header的Remaining length字段并解析剩余长度。
- 读剩余数据——可变header+有效载荷。
- 解析数据包,并根据不同报文类型做相应处理。
- UMQTT_TYPE_CONNACK:调用set_uplink_recon_tick(client, UPLINK_NEXT_TICK)函数设置下一次重连滴答值,调用set_connect_status(client, UMQTT_CS_LINKED)函数设置uMQTT客户端状态为已连接。
设置重连滴答值
设置下一次重连滴答值(reconnect tick)的目的是为了控制在失去连接后,尝试重新建立连接的时间间隔。
- 网络部稳定性:在物联网(IoT)或其他网络应用中,设备可能会面临不稳定的网络条件,如临时的断线或信号干扰。在这种情况下,失去连接后立即进行重连可能会导致频繁的连接尝试,造成网络资源浪费,增加设备的电力消耗。
- 避免过早的重连:设置重连滴答值可以防止设备过早地尝试重新连接。重连的时间间隔可以根据设备和网络的特定需求进行调整,以确保在网络稳定之前不会进行不必要的连接尝试。
- 节约资源: 预定重连滴答值有助于节约设备资源,因为设备不必持续监视网络状态并进行连接尝试。它可以在预定的时间间隔后尝试连接,以降低设备的功耗。
- 逐渐增加重连频率: 通常,设置下一次重连滴答值的方式是逐渐增加重连频率。这意味着如果第一次重连尝试失败,设备可能会等待一段较长的时间,然后在下一次尝试时等待较短的时间。这种策略允许设备在网络稳定性恢复时更频繁地尝试连接,同时避免了过于频繁的连接尝试。
总之,设置下一次重连滴答值是为了在网络不稳定或断开连接的情况下,以一种经济高效的方式管理设备的连接重试,以提高设备的性能和资源利用率。
至此,已经完成了CONNECT报文的收发过程,下一步就是启动umqtt_thread线程,调用umqtt_handle_readpacket函数来处理从Broker服务器端收到的数据报文。