MQTT 是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输 协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、 高延迟或不可靠的网络而设计。这些原则也使该协议成为新兴的“机器到机器”(M2M)或物联网(IoT)世界 的连接设备,以及带宽和电池功率非常高的移动应用的理想选择。例如,它已被用于通过卫星链路与代理 通信的传感器、与医疗服务提供者的拨号连接,以及一系列家庭自动化和小型设备场景。它也是移动应用 的理想选择,因为它体积小,功耗低,数据包最小,并且可以有效地将信息分配给一个或多个接收器。M QTT 通信模型如下图所示:
前提需要在电脑上运行起来mqtt的服务器,emqx软件非常合适,他还带了mqtt的客户端,很方便调试,具体下载和安装方法见下面博文,也可以去emqx官网下载最新版进行免费试用。
MQTT:windows最简单搭建mqtt服务端及本地客户端测试_emqx-windows-4.3.6-CSDN博客
修改网络参数
在Hi3861开发板上运行上述四个测试程序之前,需要根据你的无线路由、Linux系统IP修改 net_params.h文件的相关代码:
- PARAM_HOTSPOT_SSID 修改为你的热点名称
- PARAM_HOTSPOT_PSK 修改为你的热点密码;
- PARAM_SERVER_ADDR 修改为你的服务器IP地址;
- PARAM_SERVER_PORT 修改为你的服务器端口号;
- SERVER_IP_ADDR 修改为你的mqtt服务器IP地址;
- SERVER_IP_PORT 修改为mqtt的tcp端口,默认为1883;
- MQTT_TOPIC_SUB 订阅ID;
- MQTT_TOPIC_PUB 发布的ID;
- MQTT_CLIENT_ID MQTT的客户端ID;
- MQTT_USER_NAME MQTT的用户名;
- MQTT_PASSWORD MQTT的密码;
代码编写
修改D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\BUILD.gn文件
# Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/lite/config/component/lite_component.gni")
lite_component("demo") {
features = [
#"base_00_helloworld:base_helloworld_example",
#"base_01_led:base_led_example",
#"base_02_loopkey:base_loopkey_example",
#"base_03_irqkey:base_irqkey_example",
#"base_04_adc:base_adc_example",
#"base_05_pwm:base_pwm_example",
#"base_06_ssd1306:base_ssd1306_example",
#"kernel_01_task:kernel_task_example",
#"kernel_02_timer:kernel_timer_example",
#"kernel_03_event:kernel_event_example",
#"kernel_04_mutex:kernel_mutex_example",
#"kernel_05_semaphore_as_mutex:kernel_semaphore_as_mutex_example",
#"kernel_06_semaphore_for_sync:kernel_semaphore_for_sync_example",
#"kernel_07_semaphore_for_count:kernel_semaphore_for_count_example",
#"kernel_08_message_queue:kernel_message_queue_example",
#"wifi_09_hotspot:wifi_hotspot_example",
#"wifi_10_sta:wifi_sta_example",
#"tcp_11_server:tcp_server_example",
#"tcp_12_client:tcp_client_example",
#"udp_13_server:udp_server_example",
#"udp_14_client:udp_client_example",
"network_15_mqtt:network_mqtt_example",
]
}
创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt文件夹
文件夹中创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\BUILD.gn文件
# Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
static_library("network_mqtt_example") {
sources = [
"network_mqtt_example.c",
"network_mqtt.c",
"wifi_connecter.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTConnectClient.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTConnectServer.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTDeserializePublish.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTFormat.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTPacket.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSerializePublish.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSubscribeClient.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSubscribeServer.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTUnsubscribeServer.c",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTUnsubscribeClient.c",
]
include_dirs = [
"//utils/native/lite/include",
"//kernel/liteos_m/kal/cmsis",
"//base/iot_hardware/peripheral/interfaces/kits",
"//foundation/communication/wifi_lite/interfaces/wifiservice",
"//vendor/hqyj/fs_hi3861/common/bsp/include",
"//third_party/paho.mqtt.embedded-c/MQTTPacket/src",
"//third_party/cJSON",
]
}
文件夹中创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\network_mqtt.h文件,文件主要包含mqtt的函数。
/*
* Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef NETWORK_MQTT_H
#define NETWORK_MQTT_H
/**
* @brief MQTT 连接MQTT服务器
* @return Returns {0} 成功;
* Returns {-1} 失败.
*/
int MQTTClient_connectServer(const char *ip_addr, int ip_port);
/**
* @brief MQTT 断开连接MQTT服务器
* @return Returns {0} 成功;
* Returns {-1} 失败.
*/
int MQTTClient_unConnectServer(void);
/**
* @brief MQTT 订阅MQTT主题
* @return Returns {0} 成功;
* Returns {-1} 失败.
*/
int MQTTClient_subscribe(char *subTopic);
/**
* @brief MQTT 客户端的初始化
* @param clientID 客户端ID
* @param userName 用户名
* @param password 密码
* @return Returns {0} 成功;
* Returns {-1} 失败.
*/
int MQTTClient_init(char *clientID, char *userName, char *password);
/**
* @brief MQTT 发布消息
* @param pub_Topic 具有发布权限的主题名称
* @param payloadData 发布数据
* @param payloadLen 发布数据的长度
* @return Returns {0} 成功;
* Returns {-1} 失败.
*/
int MQTTClient_pub(char *pub_Topic, unsigned char *payloadData, int payloadLen);
/**
* @brief MQTT 接收消息
* @param callback 当接收到消息之后,将消息传到到回调函数中
* @return Returns {0} 成功;
* Returns {-1} 失败.
*/
int MQTTClient_sub(void);
extern int8_t(*p_MQTTClient_sub_callback)(unsigned char *topic, unsigned char *payload);
#endif // !NETWORK_MQTT_H
文件夹中创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\network_mqtt.c文件,文件主要包含mqtt的函数实现。
/*
* Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include "lwip/netifapi.h"
#include "lwip/sockets.h"
#include "wifi_device.h"
#include "ohos_init.h"
#include "MQTTPacket.h"
#include "network_mqtt.h"
#define MQTT_BUFF_MAX_SIZE 512
int g_tcp_socket_fd = 0; // 网络套接字
unsigned char mqttBuff[MQTT_BUFF_MAX_SIZE] = {0};
// 发送网络数据
static int transport_sendPacketBuffer(unsigned char *buf, int buflen)
{
int rc = send(g_tcp_socket_fd, buf, buflen, 0);
return (rc <= 0) ? 0 : 1;
}
// 接收网络数据
static int transport_getdata(unsigned char *buf, int count)
{
int rc = recv(g_tcp_socket_fd, buf, count, 0);
return rc;
}
// 连接服务器
int MQTTClient_connectServer(const char *ip_addr, int ip_port)
{
if (ip_addr == NULL) {
return -1;
}
int res = 0; // 函数返回值
struct sockaddr_in tcpServerConfig; // tcp服务器信息
// 创建TCP套接字
g_tcp_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (g_tcp_socket_fd < 0) {
printf("Failed to create Socket\r\n");
}
// 连接TCP服务器
tcpServerConfig.sin_family = AF_INET; // IPV4
tcpServerConfig.sin_port = htons(ip_port); // 填写服务器的IP端口号
tcpServerConfig.sin_addr.s_addr = inet_addr(ip_addr); // 填写服务器的IP地址
res = connect(g_tcp_socket_fd, (struct sockaddr *)&tcpServerConfig, sizeof(tcpServerConfig)); // 连接服务器
if (res == -1) {
printf("Failed to connect to the server\r\n");
return -1;
}
printf("Connection to server successful\r\n");
return 0;
}
// 断开TCP服务器 0:成功, -1:失败
int MQTTClient_unConnectServer(void)
{
int ret = 0;
printf("Server shut down successfully\r\n");
ret = close(g_tcp_socket_fd);
g_tcp_socket_fd = 0;
return ret;
}
// mqtt客户端 订阅主题
int MQTTClient_subscribe(char *subTopic)
{
if (subTopic == NULL) {
printf("Incorrect parameters\r\n");
return -1;
}
int len = 0, res = 0;
int subcount = 0, granted_qos = 0, req_qos = 0;
unsigned short submsgid = 0;
MQTTString topicString = MQTTString_initializer;
/* subscribe */
topicString.cstring = subTopic;
len = MQTTSerialize_subscribe(mqttBuff, sizeof(mqttBuff), 0, 1, 1, &topicString, &req_qos);
if (len <= 0) {
printf("MQTTSerialize_subscribe Error %d\r\n", len);
return -1;
}
res = transport_sendPacketBuffer(mqttBuff, len);
if (res != 1) {
printf("transport_sendPacketBuffer Error %d\r\n", res);
return -1;
}
sleep(1);
memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));
/* wait for suback */
if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) != SUBACK) {
printf("MQTTPacket_read Error\r\n");
return -1;
}
if (MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, mqttBuff, sizeof(mqttBuff)) != 1) {
printf("MQTTDeserialize_suback Error\r\n");
return -1;
}
printf("MQTT subscribed to topics successfully\r\n");
return 0;
}
// 保持在线时长 60s
#define MQTT_KEEP_ALIVE 60
#define MQTT_DELAY_TIME 3
// mqtt客户端 初始化
int MQTTClient_init(char *clientID, char *userName, char *password)
{
if (clientID == NULL || userName == NULL || password == NULL) {
printf("Incorrect parameters\r\n");
return -1;
}
int res = 0, len = 0, i = 0;
int mqtt_read_len = 10;
unsigned char sessionPresent = 0, connack_rc = 0;
MQTTPacket_connectData mqttData = MQTTPacket_connectData_initializer;
// 初始化MQTT客户端
mqttData.clientID.cstring = clientID;
mqttData.username.cstring = userName;
mqttData.password.cstring = password;
mqttData.cleansession = true; // 是否初始化的时候,清除上一次的对话
mqttData.keepAliveInterval = MQTT_KEEP_ALIVE;
// 组MQTT消息包
len = MQTTSerialize_connect(mqttBuff, sizeof(mqttBuff), &mqttData);
if (len <= 0) {
printf("MQTTSerialize_connect Error %d\r\n", res);
return -1;
}
res = transport_sendPacketBuffer(mqttBuff, len);
if (res != 1) {
printf("transport_sendPacketBuffer Error %d\r\n", res);
return -1;
}
sleep(MQTT_DELAY_TIME);
/* 打印发送出去的数据帧,调试用 */
printf("MQTT_sendPacket: \r\n");
for (i = 0; i < len; i++) {
printf("%x ", mqttBuff[i]);
}
printf("\r\n");
memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));
/* wait for connack */
if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) != CONNACK) {
printf("MQTTPacket_read != CONNACK\r\n");
}
printf("MQTT_recvPacket: \r\n");
/* 打印服务器返回的消息,调试用 */
for (i = 0; i < mqtt_read_len; i++) {
printf("%x ", mqttBuff[i]);
}
printf("\r\n");
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqttBuff, sizeof(mqttBuff)) != 1 || connack_rc != 0) {
printf("Unable to connect, return code %d\r\n", connack_rc);
memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));
return -1;
} else {
printf("MQTT initialized successfully\r\n");
}
memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));
return 0;
}
#define MQTT_PUB_DATA_TIME (100 * 1000)
int MQTTClient_pub(char *pub_Topic, unsigned char *payloadData, int payloadLen)
{
if (payloadData == NULL) {
printf("Incorrect parameters\r\n");
return -1;
}
printf("pubTopic: %s\n", pub_Topic);
printf("pubData: %s\n", payloadData);
int ret = 0, len = 0;
unsigned short retry_count = 5; // 重发次数
unsigned char sendBuff[MQTT_BUFF_MAX_SIZE] = {0};
MQTTString topicString = MQTTString_initializer;
topicString.cstring = pub_Topic;
len = MQTTSerialize_publish(sendBuff, sizeof(sendBuff), 0, 0, 0, 0, topicString,
payloadData,
payloadLen);
while (--retry_count > 0) {
ret = transport_sendPacketBuffer(sendBuff, len);
if (ret == 1) {
break;
}
printf("Send MQTT_Data Fail\r\n");
usleep(MQTT_PUB_DATA_TIME);
}
if (!retry_count && ret != 1) {
printf("transport_sendPacketBuffer Error %d\r\n", ret);
return -1;
}
// printf("send==>%s", payloadData);
return 0;
}
unsigned char mqtt_topic[200];
int8_t (*p_MQTTClient_sub_callback)(unsigned char *topic, unsigned char *payload);
int MQTTClient_sub(void)
{
int qos, payloadlen_in;
unsigned char dup, retained;
unsigned short msgid;
unsigned char *payload_in;
MQTTString receivedTopic;
memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));
// $oc/devices/63ad5a6cc4efcc747bd75973_lamp/sys/commands/request_id=42c20ffb-0885-4f6e-97b5-45d8f613efaf
if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) == PUBLISH) {
MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
&payload_in, &payloadlen_in, mqttBuff, sizeof(mqttBuff));
printf("data: %s\n", receivedTopic.lenstring.data);
printf("length: %d\n", strlen(receivedTopic.lenstring.data) - payloadlen_in);
printf("payload_length: %d\n", payloadlen_in);
memcpy_s(mqtt_topic, sizeof(mqtt_topic),
receivedTopic.lenstring.data, strlen(receivedTopic.lenstring.data) - payloadlen_in);
printf("topic: %s\n", mqtt_topic);
printf("payload: %s\n", payload_in);
p_MQTTClient_sub_callback(mqtt_topic, payload_in);
}
}
文件夹中创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\wifi_connecter.h文件,该头文件包含wifi连接的宏。文件同tcp_12_client\wifi_connecter.h
文件夹中创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\wifi_connecter.c文件,文件同tcp_12_client\wifi_connecter.c
文件夹中创建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\network_mqtt_example.c文件
/*
* Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "ohos_init.h"
#include "cmsis_os2.h"
#include "network_mqtt.h"
#include "wifi_connecter.h"
#define SERVER_IP_ADDR "192.168.137.1"
#define SERVER_IP_PORT 1883
#define MQTT_TOPIC_SUB "subTopic"
#define MQTT_TOPIC_PUB "pubTopic"
#define MQTT_CLIENT_ID "mqtt_client_123"
#define MQTT_USER_NAME "rtplay"
#define MQTT_PASSWORD "password"
#define TASK_STACK_SIZE (1024 * 5)
#define TASK_INIT_TIME 2 // s
#define MQTT_RECV_TASK_TIME (200 * 1000) // us
#define DELAY_TICKS_10 (10)
osThreadId_t mqtt_send_task_id; // mqtt订阅数据任务
osThreadId_t mqtt_recv_task_id; // mqtt发布数据任务
int8_t mqtt_sub_payload_callback(unsigned char *topic, unsigned char *payload)
{
printf("[info] topic:[%s] recv<== %s\r\n", topic, payload);
}
void mqtt_recv_task(void)
{
while (1) {
MQTTClient_sub();
usleep(MQTT_RECV_TASK_TIME);
}
}
void mqtt_send_task(void)
{
// 连接Wifi
WifiDeviceConfig config = {0};
// 准备AP的配置参数
// strcpy(config.ssid, PARAM_HOTSPOT_SSID);
// strcpy(config.preSharedKey, PARAM_HOTSPOT_PSK);
strcpy_s(config.ssid, WIFI_MAX_SSID_LEN, PARAM_HOTSPOT_SSID);
strcpy_s(config.preSharedKey, WIFI_MAX_KEY_LEN, PARAM_HOTSPOT_PSK);
config.securityType = PARAM_HOTSPOT_TYPE;
osDelay(DELAY_TICKS_10);
int netId = ConnectToHotspot(&config);
// 连接MQTT服务器
if (MQTTClient_connectServer(SERVER_IP_ADDR, SERVER_IP_PORT) != 0) {
printf("[error] MQTTClient_connectServer\r\n");
} else {
printf("[success] MQTTClient_connectServer\r\n");
}
sleep(TASK_INIT_TIME);
// 初始化MQTT客户端
if (MQTTClient_init(MQTT_CLIENT_ID, MQTT_USER_NAME, MQTT_PASSWORD) != 0) {
printf("[error] MQTTClient_init\r\n");
} else {
printf("[success] MQTTClient_init\r\n");
}
sleep(TASK_INIT_TIME);
// 订阅Topic
if (MQTTClient_subscribe(MQTT_TOPIC_SUB) != 0) {
printf("[error] MQTTClient_subscribe\r\n");
} else {
printf("[success] MQTTClient_subscribe\r\n");
}
sleep(TASK_INIT_TIME);
osThreadAttr_t options;
options.name = "mqtt_recv_task";
options.attr_bits = 0;
options.cb_mem = NULL;
options.cb_size = 0;
options.stack_mem = NULL;
options.stack_size = TASK_STACK_SIZE;
options.priority = osPriorityNormal;
mqtt_recv_task_id = osThreadNew((osThreadFunc_t)mqtt_recv_task, NULL, &options);
if (mqtt_recv_task_id != NULL) {
printf("ID = %d, Create mqtt_recv_task_id is OK!\r\n", mqtt_recv_task_id);
}
while (1) {
MQTTClient_pub(MQTT_TOPIC_PUB, "hello world!!!", strlen("hello world!!!"));
sleep(TASK_INIT_TIME);
}
}
static void network_wifi_mqtt_example(void)
{
printf("Enter network_wifi_mqtt_example()!\r\n");
p_MQTTClient_sub_callback = &mqtt_sub_payload_callback;
osThreadAttr_t options;
options.name = "mqtt_send_task";
options.attr_bits = 0;
options.cb_mem = NULL;
options.cb_size = 0;
options.stack_mem = NULL;
options.stack_size = TASK_STACK_SIZE;
options.priority = osPriorityNormal;
mqtt_send_task_id = osThreadNew((osThreadFunc_t)mqtt_send_task, NULL, &options);
if (mqtt_send_task_id != NULL) {
printf("ID = %d, Create mqtt_send_task_id is OK!\r\n", mqtt_send_task_id);
}
}
SYS_RUN(network_wifi_mqtt_example);
使用build,编译成功后,使用upload进行烧录。
首先运行emqx
设置中文
运行后电脑ap会看到设备
串口会输出
网页中客户端会看到模块
主题也会看到
创建websocket连接服务
客户端已经有了两个
订阅模块的消息
可以收到模块发送的消息
发送模块订阅的消息
模块能收到消息