环境:
开发板:Panduola(stm32L475)
KEIL5 开发环境
rtthread 4.0.3内核
使用ENV 配置Rtt
MQTT
1.MQTT介绍
客户端 Client
- 使用MQTT的程序或设备。客户端总是通过网络连接到服务端。
- 它可以发布应用消息给其它相关的客户端。
- 订阅以请求接受相关的应用消息。
- 取消订阅以移除接受应用消息的请求。
- 从服务端断开连接。服务端 Server一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。
服务端
-
接受来自客户端的网络连接。
-
接受客户端发布的应用消息。
-
处理客户端的订阅和取消订阅请求。
-
转发应用消息给符合条件的已订阅客户端。
订阅 Subscription
-
订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。
-
QoS0,At most once,至多一次;Sender 发送的一条消息,Receiver 最多能收到一次,如果发送失败,也就算了。
-
QoS1,At least once,至少一次;Sender 发送的一条消息,Receiver 至少能收到一次,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但Receiver 有可能会收到重复的消息
-
QoS2,Exactly once,确保只有一次。Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
2.MQTT协议数据包结构
一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、有效载荷(payload)三部分构成。
(1) 固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
(3)有效载荷(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。
操作:
添加AT指令部分,使用ESP8266连接网络:
RT-Thread online packages --->
IoT - internet of things --->
[*] AT DEVICE: RT-Thread AT component porting or samples for different device
[*] Espressif ESP8266 --->
1. 添加RTT组件包:
MQTT:
CJSON:
AHT10:使用AHT10还需要打开I2c的驱动部分,使用旧版本1.0的AHT驱动,可以避免使用Sensor的框架
启用I2c总线:
2.使用scons --target=mdk5生成mdk工程
3.使用MQtt客户端:
修改连接参数:
#define MQTT_URI “tcp://192.168.1.110:1883”
#define MQTT_USERNAME “panduola”
#define MQTT_PASSWORD “panduola”
#define MQTT_SUBTOPIC “/test/topic2”
#define MQTT_PUBTOPIC “/test/topic1”
可在rtconfig.h中修改wifi连接参数:
-
创建一个客户端:
static MQTTClient client;
-
初始化客户端:
/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{
/* 初始 condata 参数 */
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
static char cid[20] = {0};
static int is_started = 0;
if (is_started)
{
return;
}
/* 配置 MQTT 文本参数 */
{
client.isconnected = 0;
client.uri = MQTT_URI;
/* 生成随机客户端 ID */
rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
// rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);
/* 配置连接参数 */
memcpy(sup_pub_topic, MQTT_SUBTOPIC, sizeof(MQTT_SUBTOPIC));
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientID.cstring = cid;
client.condata.keepAliveInterval = 60;
client.condata.cleansession = 1;
client.condata.username.cstring = MQTT_USERNAME;
client.condata.password.cstring = MQTT_PASSWORD;
/* 配置 mqtt 参数 */
client.condata.willFlag = 0;
client.condata.will.qos = 1;
client.condata.will.retained = 0;
client.condata.will.topicName.cstring = sup_pub_topic;
client.buf_size = client.readbuf_size = 1024;
client.buf = malloc(client.buf_size);
client.readbuf = malloc(client.readbuf_size);
if (!(client.buf && client.readbuf))
{
LOG_E("no memory for MQTT client buffer!");
goto _exit;
}
/* 设置事件回调 */
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/* 设置要订阅的 topic 和 topic 对应的回调函数 */
client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = QOS1;
/* 设置默认订阅回调函数 */
client.defaultMessageHandler = mqtt_sub_default_callback;
}
/* 启动 MQTT 客户端 */
LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);
paho_mqtt_start(&client);
is_started = 1;
_exit:
return;
}
- 设置收到信息后的回调:
static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{
printf("Receive message ,Topic: %.*s,payload:\n", msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data);
// 解析JSON消息
cJSON *json_obj = cJSON_Parse((const char *)msg_data->message->payload);
if (json_obj == NULL)
{
printf("Failed to parse JSON message\n");
return;
}
cJSON *object = RT_NULL;
object = cJSON_GetObjectItem(json_obj, "location");
if (object != NULL)
{
printf("Location: %s\n", object->valuestring);
}
object = cJSON_GetObjectItem(json_obj, "led");
if (object != NULL)
{
if(object->type == cJSON_True)
printf("led: ture\n");
else
printf("led: false\n");
}
cJSON_Delete(json_obj);
return;
}
- 发布消息:
/* MQTT 消息发布函数 */
static void mq_publish(const char *send_str)
{
MQTTMessage message;
const char *msg_str = send_str;
const char *topic = MQTT_PUBTOPIC;
message.qos = QOS1;
message.retained = 0;
message.payload = (void *)msg_str;
message.payloadlen = strlen(message.payload);
MQTTPublish(&client, topic, &message);
return;
}
常用的cjson函数:
void cJSON_Delete(cJSON *c)
删除 cJSON 指针,释放空间
char *cJSON_Print(cJSON *item)
cJSON数据解析成JSON字符串,并会在堆中开辟一块char *的内存空间,存放JSON字符串。
函数成功后会返回一个char *指针,该指针指向位于堆中JSON字符串。
cJSON *cJSON_Parse(const char *value)
将一个JSON数据包,按照cJSON结构体的结构序列化整个数据包,并在堆中开辟一块内存存储cJSON结构体
返回值:成功返回一个指向内存块中的cJSON的指针,失败返回NULL
cJSON *cJSON_GetObjectItem(cJSON *object,const char *string)
获取JSON字符串字段值,成功返回一个指向cJSON类型的结构体指针,失败返回NULL
常用的mqtt的API:
/**
* This function send an MQTT subscribe packet and wait for suback before returning.
*
* @param client the pointer of MQTT context structure
* @param qos MQTT Qos type, only support QOS1
* @param topic topic filter name
* @param callback the pointer of subscribe topic receive data function
*
* @return the error code, 0 on start successfully.
*/
/*订阅主题,最后一个形参是void (*subscribe_cb)(MQTTClient *client, MessageData *data);类型的函数指针*/
int paho_mqtt_subscribe(MQTTClient *client, enum QoS qos, const char *topic, subscribe_cb callback);
/**
* This function publish message to specified mqtt topic.
* @note it will be discarded, recommend to use "paho_mqtt_publish"
*
* @param c the pointer of MQTT context structure
* @param topicFilter topic filter name
* @param message the pointer of MQTTMessage structure
*
* @return the error code, 0 on subscribe successfully.
*/
int MQTTPublish(MQTTClient *client, const char *topic, MQTTMessage *message);
source:
#include <rtthread.h>
#include <rtdevice.h>
#include <board.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include "paho_mqtt.h"
// #include "wifi_config.h"
#include "aht10.h"
#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>
#include <cJSON.h>
/**
* MQTT URI farmat:
* domain mode
* tcp://iot.eclipse.org:1883
*
* ipv4 mode
* tcp://192.168.10.1:1883
* ssl://192.168.10.1:1884
*
* ipv6 mode
* tcp://[fe80::20c:29ff:fe9a:a07e]:1883
* ssl://[fe80::20c:29ff:fe9a:a07e]:1884
*/
#define MQTT_URI "tcp://192.168.1.110:1883"
#define MQTT_USERNAME "panduola"
#define MQTT_PASSWORD "panduola"
#define MQTT_SUBTOPIC "/test/topic2"
#define MQTT_PUBTOPIC "/test/topic1"
#define LED_PIN GET_PIN(E, 8)
/* define MQTT client context */
static MQTTClient client;
static void mq_start(void);
static void mq_publish(const char *send_str);
char sup_pub_topic[48] = MQTT_PUBTOPIC;
char sup_sub_topic[48] = MQTT_SUBTOPIC;
static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{
printf("Receive message ,Topic: %.*s,payload:\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);
// 解析JSON消息
cJSON *json_obj = cJSON_Parse((const char *)msg_data->message->payload);
if (json_obj == NULL)
{
printf("Failed to parse JSON message\n");
return;
}
cJSON *object = RT_NULL;
object = cJSON_GetObjectItem(json_obj, "location");
if (object != NULL)
{
printf("Location: %s\n", object->valuestring);
}
object = cJSON_GetObjectItem(json_obj, "led");
if (object != NULL)
{
if (object->type == cJSON_True)
{
printf("led: ture\n");
rt_pin_write(LED_PIN, PIN_LOW);
}
else
{
printf("led: false\n");
rt_pin_write(LED_PIN, PIN_HIGH);
}
}
cJSON_Delete(json_obj);
return;
}
static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{
*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
LOG_D("mqtt sub default callback: %.*s %.*s",
msg_data->topicName->lenstring.len,
msg_data->topicName->lenstring.data,
msg_data->message->payloadlen,
(char *)msg_data->message->payload);
return;
}
static void mqtt_connect_callback(MQTTClient *c)
{
LOG_I("Start to connect mqtt server");
}
static void mqtt_online_callback(MQTTClient *c)
{
LOG_D("Connect mqtt server success");
LOG_D("Publish message: Hello,RT-Thread! to topic: %s", sup_pub_topic);
mq_publish("Hello,RT-Thread!");
}
static void mqtt_offline_callback(MQTTClient *c)
{
LOG_I("Disconnect from mqtt server");
}
/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{
/* 初始 condata 参数 */
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
static char cid[20] = {0};
static int is_started = 0;
if (is_started)
{
return;
}
/* 配置 MQTT 文本参数 */
{
client.isconnected = 0;
client.uri = MQTT_URI;
/* 生成随机客户端 ID */
rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
// rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);
/* 配置连接参数 */
memcpy(sup_pub_topic, MQTT_SUBTOPIC, sizeof(MQTT_SUBTOPIC));
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientID.cstring = cid;
client.condata.keepAliveInterval = 60;
client.condata.cleansession = 1;
client.condata.username.cstring = MQTT_USERNAME;
client.condata.password.cstring = MQTT_PASSWORD;
/* 配置 mqtt 参数 */
client.condata.willFlag = 0;
client.condata.will.qos = 1;
client.condata.will.retained = 0;
client.condata.will.topicName.cstring = sup_pub_topic;
client.buf_size = client.readbuf_size = 1024;
client.buf = malloc(client.buf_size);
client.readbuf = malloc(client.readbuf_size);
if (!(client.buf && client.readbuf))
{
LOG_E("no memory for MQTT client buffer!");
goto _exit;
}
/* 设置事件回调 */
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/* 设置要订阅的 topic 和 topic 对应的回调函数 */
client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = QOS1;
/* 设置默认订阅回调函数 */
client.defaultMessageHandler = mqtt_sub_default_callback;
}
/* 启动 MQTT 客户端 */
LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);
paho_mqtt_start(&client);
is_started = 1;
_exit:
return;
}
/* MQTT 消息发布函数 */
static void mq_publish(const char *send_str)
{
MQTTMessage message;
const char *msg_str = send_str;
const char *topic = MQTT_PUBTOPIC;
message.qos = QOS1;
message.retained = 0;
message.payload = (void *)msg_str;
message.payloadlen = strlen(message.payload);
MQTTPublish(&client, topic, &message);
return;
}
rt_thread_t TH_Get_HU;
rt_thread_t Publish_value;
float humidity, temperature;
void get_humi_temp(void *parameter)
{
aht10_device_t dev;
const char *i2c_bus_name = "i2c2";
int count = 0;
rt_thread_mdelay(2000);
dev = aht10_init(i2c_bus_name);
if (dev == RT_NULL)
{
LOG_E(" The sensor initializes failure");
}
while (count++ < 100)
{
humidity = aht10_read_humidity(dev);
// LOG_D("humidity : %d.%d %%", (int)humidity, (int)(humidity * 10) % 10);
temperature = aht10_read_temperature(dev);
// LOG_D("temperature: %d.%d", (int)temperature, (int)(temperature * 10) % 10);
rt_thread_mdelay(1000);
}
}
void Publish_Date(void *parameter)
{
char send_str[128];
while (1)
{
// sprintf(send_str, "{\"temperature\":%d.%d,\"humidity\":%d.%d}", (int)temperature, (int)(temperature * 10) % 10, (int)humidity, (int)(humidity * 10) % 10);
sprintf(send_str, "{\"location\":\"10#A401\",\"led\":true,\"environment\":{\"temperature\":%d.%d,\"humidity\":%d.%d}}", (int)temperature, (int)(temperature * 10) % 10, (int)humidity, (int)(humidity * 10) % 10);
mq_publish(send_str);
rt_thread_mdelay(1000);
}
}
int main(void)
{
mq_start();
TH_Get_HU = rt_thread_create("get_humi_temp", get_humi_temp, RT_NULL, 1024, 20, 10);
Publish_value = rt_thread_create("Publish_value", Publish_Date, RT_NULL, 1024, 20, 10);
rt_pin_mode(LED_PIN, PIN_MODE_OUTPUT);
rt_thread_startup(TH_Get_HU);
rt_thread_mdelay(1000);
rt_thread_startup(Publish_value);
}