一、简介
有关MQTT的相关概念介绍,请看之前的文章,这里不做过多的介绍:MQTT学习总结_t_guest的博客-CSDN博客
本章节需要使用如下软件:
Mosquitto(MQTT消息代理工具)
Eclipsse paho MQTT工具
二、操作说明
1.下载Mosquitto工具。
点击Download | Eclipse Mosquitto进入官网。选择自己系统的下载文件。这里使用的是win10 64位系统。
2.添加配置
安装成功后,以文件的形式打开安装文件夹中的mosquitto.conf文件。
在Listener处添加需要监听的端口和IP。这里添加的IP为自己电脑的IP。如果不知道自己电脑的IP是多少,可以在win+r中输入cmd打开终端,然后在终端中输入ipconfig来查看。
修改allow_anonymous false为allow_anonymous true,即允许匿名登录。
Ctrl+s保存设置。
3.启动服务
在windows的搜索栏里边搜索服务。
找到Mosquitto Broker服务,并且右键启动该服务。
成功后,显示如下。
注:每次修改mosquitto.conf文件后,都需要重新启动Mosquitto Broker服务,这样设置才会生效。
如果设置的IP不对,启动服务时会报异常。
4.下载Eclipse Paho MQTT工具。
点击Index of /repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.ui.app/1.1.1选择对应版本进行下载。
5.新建客户端
解压后,点击应用文件。
注:如果此时报错,标明当前环境下未安装JAVA环境,可以参考如下链接进行配置Windows安装JDK详细教程(图文教程) - 知乎。
打开客户端后,点击左上角的+号新建客户端。
新建成功后会自动生成服务器地址:tcp://localhost:1883。这个就是mosquitto监控的端口。localhost就是自己的主机,所以,也可以将上述链接修改为tcp://192.168.3.199:1883(192.168.3.199是本机IP),同样是可以连接的。
6.订阅和发布
点击+号,添加要订阅的主题,并且点击订阅。
输入要发布的主题,点击发布。
三、API介绍
NetworkInit
函数功能:
初始化MQTT相关的参数和回调。
函数原型:
void NetworkInit(Network* n)
参数:
n:网络参数,结构体为Network
typedef struct Network
{
int my_socket;
int (*mqttread) (struct Network*, unsigned char*, int, int);
int (*mqttwrite) (struct Network*, unsigned char*, int, int);
} Network;
my_socket:后续mqtt要使用到的描述符。
mqttread:mqtt的读回调函数
mqttwrite:mqtt的写回调函数
返回值:
无
实例:
Network network;
NetworkInit(&network);
NetworkConnect
函数功能:
连接MQTT服务器
函数原型:
int NetworkConnect(Network* n, char* addr, int port)
参数:
n:网络参数描述符
addr:要连接的服务器IP
port:要连接的服务器端口
返回值:
-1:失败
0:成功
实例:
Network network;
NetworkConnect(&network, "192.168.3.15", 1883);
MQTTClientInit
函数功能:
创建一个MQTT客户端实例
函数原型:
void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
参数:
c:客户端描述符,结构体为MQTTClient
typedef struct MQTTClient
{
unsigned int next_packetid, //下个包ID
command_timeout_ms; //指令超时时间
size_t buf_size, //缓冲区大小
readbuf_size; //读缓冲区大小
unsigned char *buf, //缓冲区指针
*readbuf; //读缓冲区指针
unsigned int keepAliveInterval; //保活间隔
char ping_outstanding; //ping 包
int isconnected; //已连接标志
int cleansession; //清除回话
struct MessageHandlers
{
const char* topicFilter; //主题文件
void (*fp) (MessageData*); //消息数据
} messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */
void (*defaultMessageHandler) (MessageData*); //默认消息回调
Network* ipstack; //IP栈
Timer last_sent, last_received, pingresp_timer; //最近一次的发送和接收时间戳
#if defined(MQTT_TASK)
Mutex mutex; //互斥锁
Thread thread; //任务
#endif
} MQTTClient;
network:网络参数描述符
command_timeout_ms:指令超时时间
sendbuf:发送缓冲区
sendbuf_size:缓冲区大小
readbuf:接收缓冲区
readbuf_size:缓冲区大小
返回值:
无
实例:
Network network;
MQTTClient client;
static unsigned char sendBuf[1000];
static unsigned char readBuf[1000];
MQTTClientInit(&client, &network, 2000, sendBuf, sizeof(sendBuf), readBuf, sizeof(readBuf));
MQTTConnect
函数功能:
发送一个MQTT连接包到服务器,并且等待连接应答
函数原型:
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
参数:
c:客户端描述符,MQTTClientInit函数参数。
options:连接参数,结构体为MQTTPacket_connectData
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4]; //协议名称,固定为“MQTC”
/** The version number of this structure. Must be 0 */
int struct_version; //结构版本,固定为0
/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
*/
unsigned char MQTTVersion; //MQTT版本
MQTTString clientID; //客户端ID
unsigned short keepAliveInterval; //保活间隔
unsigned char cleansession; //清除回话标志
unsigned char willFlag; //遗愿标志
MQTTPacket_willOptions will; //遗愿
MQTTString username; //用户名
MQTTString password; //密码
} MQTTPacket_connectData;
返回值:
0:成功
其他值:失败
实例:
MQTTClient client;
#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \
MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} }
#define MQTTString_initializer {NULL, {0, NULL}}
MQTTString clientId = MQTTString_initializer;
clientId.cstring = "Harmony";
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.clientID = clientId; //客户端ID
data.willFlag = 0; //没有遗愿
data.MQTTVersion = 3; //MQTT版本
data.keepAliveInterval = 60; //保活间隔
data.cleansession = 1; //清除会话
MQTTConnect(&client, &data);
MQTTSubscribe
函数功能:
设置订阅参数
函数原型:
int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
messageHandler messageHandler)
参数:
c:客户端描述符,MQTTClientInit函数参数。
tipicFiter:订阅的主题
qos:通讯报发送状态
QoS0:至多发送一次(可能会丢包)
QoS1:最少一次(保证包到达,可能会出现重包)
QoS2:只有一次(保证包会到达目的地,且不会出现重包)
messageHandler:接收消息的回调函数
返回值:
0:成功
其他值:失败
实例:
MQTTClient client;
MQTTSubscribe(&client, "substopic", QOS2, messageArrived);
MQTTPublish
函数功能:
发送一个MQTT发布包并且等待应答包去完成QoS
函数原型:
int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
参数:
c:客户端描述符,MQTTClientInit函数参数。
topicName:目标主题
message:推送的消息,结构体为MQTTMessage
typedef struct MQTTMessage
{
enum QoS qos; //QoS值
unsigned char retained; //为1时,briker应该保存该条消息,当之后有任何新的订阅主题设备上线时,都会收到这条消息。
unsigned char dup; //消息重发标志,为1时,表示这是一条重发的消息
unsigned short id; //数据包标识
void *payload; //有效数据负载
size_t payloadlen; //负载长度
} MQTTMessage;
返回值:
0:成功
其他值:失败
实例:
MQTTClient client;
MQTTMessage message;
nt rc = 0;
char payload[] = "this is a topic MQTT msg";
message.qos = 2; //只有一次,
message.retained = 0; //服务器不保存
message.payload = payload; //负载
message.payloadlen = strlen(payload); //长度
if ((rc = MQTTPublish(&client, "pubtopic", &message)) != 0){} //失败
MQTTYield
函数功能:
MQTT主动获取数据,获取到数据,会触发MQTTSubscribe订阅函数的接收回调
函数原型:
int MQTTYield(MQTTClient* c, int timeout_ms)
参数:
c:客户端描述符,MQTTClientInit函数参数。
timeout_ms:等待超时时间
返回值:
0:成功
其他值:失败
实例:
MQTTClient client;
MQTTYield(client,1000);
注:MQTTYield不只是轮训检查MQTT读消息。在轮训过程中还会检测保活周期,如果保活周期到了,就向MQTT服务器发送一个ping包来进行保活。如果在这期间有MQTT数据交互,则重新开始计数。此外需要注意的是,调用该函数后,程序会不停的执行,直到超时时间到。即该函数会占用CPU使用权,导致低优先级的任务无法执行。
NetworkDisconnect
函数功能:
网络连接断开,断开SOCKET
函数原型:
void NetworkDisconnect(Network* n)
参数:
n:网络参数描述符,NetworkConnect函数设置
返回值:
无
实例:
Network network;
NetworkDisconnect(&network);
MQTTDisconnect
函数功能:
断开MQTT,向服务器发送断开数据包
函数原型:
int MQTTDisconnect(MQTTClient* c)
参数:
c:客户端描述符,MQTTClientInit 函数初始化
返回值:
0:成功
其他值:失败
实例:
MQTTClient client;
MQTTDisconnect(&client);
四、实例
这里创建一个MQTT客户端,与PC端另外一个客户端进行订阅与发布的通信。
现在BUILD.gn文件中添加如下代码:
include_dirs = [
"//utild/native/lite/include",
"//base/iot_hardware/interfaces/kits/wifiiot_lite",
"//utils/native/lite/include",
"//kernel/liteos_m/components/cmsis/2.0",
"//foundation/communication/interfaces/kits/wifi_lite/wifiservice",
"//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include/",
"//third_party/cJSON",
"//third_party/paho_mqtt/MQTTPacket/src",
"//third_party/paho_mqtt/MQTTClient-C/src",
"src",
]
deps = [
"//third_party/paho_mqtt:pahomqtt_static",
]
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "ohos_init.h"
#include "cmsis_os2.h"
#include "MQTTClient.h"
#define LOG_I(fmt, args...) printf("<%8ld> - [MQTT]:"fmt"\r\n",osKernelGetTickCount(),##args);
#define LOG_E(fmt, args...) printf("<%8ld>-[MQTT_ERR]>>>>>>>>>>>>:"fmt"\r\n",osKernelGetTickCount(), ##args);
static unsigned char sendBuf[1000];
static unsigned char readBuf[1000];
Network network;
void messageArrived(MessageData* data)
{
LOG_I("Message arrived on topic %.*s: %.*s\n", data->topicName->lenstring.len, data->topicName->lenstring.data,
data->message->payloadlen, data->message->payload);
}
/* */
#define MQTT_IP "192.168.3.15"
#define MQTT_PORT 1883
static void MQTT_DemoTask(void)
{
//连接Wifi
extern int drv_wifi_connect(const char *ssid, const char *psk);
drv_wifi_connect("HUAWEI_ZNJJ", "znjj1230");
LOG_I("wifi connect success,mqtt init starting ...\n");
int rc, count = 0;
MQTTClient client;
NetworkInit(&network);
LOG_I("NetworkConnect ...,IP:%s,port:%d\n",MQTT_IP,MQTT_PORT);
begin:
NetworkConnect(&network, MQTT_IP, MQTT_PORT);
LOG_I("MQTTClientInit ...\n");
MQTTClientInit(&client, &network, 2000, sendBuf, sizeof(sendBuf), readBuf, sizeof(readBuf));
MQTTString clientId = MQTTString_initializer;
clientId.cstring = "Harmony";
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.clientID = clientId; //客户端ID
data.willFlag = 0; //没有遗愿
data.MQTTVersion = 3; //MQTT版本
data.keepAliveInterval = 60; //保活间隔
data.cleansession = 1; //清除会话
LOG_I("MQTTConnect ...\n");
rc = MQTTConnect(&client, &data);
if (rc != 0) {
LOG_E("MQTTConnect: %d\n", rc);
NetworkDisconnect(&network);
MQTTDisconnect(&client);
osDelay(200);
goto begin;
}
LOG_I("MQTTSubscribe ...\n");
rc = MQTTSubscribe(&client, "substopic", QOS2, messageArrived);
if (rc != 0) {
LOG_E("MQTTSubscribe: %d\n", rc);
osDelay(200);
goto begin;
}
LOG_I("MQTTSubscribe success\n");
while (++count)
{
MQTTMessage message;
char payload[30];
message.qos = 2; //只有一次,
message.retained = 0; //服务器不保存
message.payload = payload; //负载
sprintf(payload, "message number %d", count);
message.payloadlen = strlen(payload); //长度
if ((rc = MQTTPublish(&client, "pubtopic", &message)) != 0){
LOG_I("Return code from MQTT publish is %d\n", rc);
NetworkDisconnect(&network);
MQTTDisconnect(&client);
goto begin;
}
LOG_I("MQTT publish success");
osDelay(100);
}
}
void app_mqtt_init(void)
{
osThreadAttr_t attr;
attr.name = "MQTT_DemoTask";
attr.attr_bits = 0U;
attr.cb_mem = NULL;
attr.cb_size = 0U;
attr.stack_mem = NULL;
attr.stack_size = 10240;
attr.priority = osPriorityNormal;
if (osThreadNew((osThreadFunc_t)MQTT_DemoTask, NULL, &attr) == NULL) {
LOG_I("[MQTT_Demo] Falied to create MQTT_DemoTask!\n");
}
}
这里不停的向“pubtopic”主题发送数据,周期为1秒。且订阅“substopic”主题的消息,如果有,则调用messageArrived回调打印数据。
看结果:
可以看到,芯片的订阅是PC的发布,芯片的发布是PC的订阅。
注:Eclipsse paho MQTT最好在笔记本电脑上使用,台式机可能会无法链接。