目录
概述
1 配置EMQX服务器
1.1 搭建EMQX服务器
1.2 配置服务器参数
2 ESP32实现MQTT Client
2.1 创建MQTT Client项目
2.2 实现MQTT Client
2.3 ESP32连接EMQX
3 ESP32Client实现广播和订阅消息
3.1 广播消息
3.1.1 编写广播消息函数
3.1.2 下载和验证
3.1.3 订阅ESP32 Client消息
3.2 订阅消息
3.2.1 MQTT.fx定义广播消息
3.2.2 ESP32 Client订阅消息
3.2.3 下载和验证
4 ESP32控制外围硬件
4.1 功能描述
4.2 代码实现
4.3 下载和验证
5 源代码文件
使用ESP32和EMQX搭建物联网平台,实现消息发布和订阅功能
概述
本文主要介绍使用ESP32实现MQTT Client, 连接至使用EMQX自建服务器上,使用MQTT.fx模拟客户端发布和订阅来自ESP32实现MQTT Client的消息,同时ESP32实现MQTT Client也订阅来自MQTT.fx的消息。还使用MQTT.fx发布消息的方式来触发ESP32控制外围设备。
1 配置EMQX服务器
1.1 搭建EMQX服务器
登录如下网址,下载EMQX数据包,搭建一台自己的EMQX服务器。
https://www.emqx.io/zh/downloads?os=Ubuntu
笔者使用ubuntu系统搭建了一台EMQX服务器,其具体步骤如下:
1.2 配置服务器参数
安装完成服务器,登录系统
在客户端认证 pannel 添加用户端信息,笔者添加了两个mqtt_user,分别为:
mqtt_user-1 : 用于MQTT.fx 客户端订阅和发布消息
mqtt_user-2: 用于ESP32 客户端订阅和发布消息
配置参数时,记录下用户名和密码信息,两个参数对应:
AccessToken: 用户名
ProjectKey: 密码
2 ESP32实现MQTT Client
2.1 创建MQTT Client项目
打开Arduino IED, 在Tools--->Mange librarys下,搜索AsyncMqtt_Generic这个SDK,然后安装这个SDK.
安装完毕之后,在对应的SDK提供的Example中选择一个模版,然后添加自己的代码。
2.2 实现MQTT Client
笔者选择 FullyFeature_ESP32.ino 作为模版项目。使用Arduino IED打开项目文件,如下参数需要根据实际情况一一配置。
参数功能介绍如下:
参数 | 描述 |
---|---|
WIFI_SSID | wifi 账号名称 |
WIFI_PASSWORD | wifi密码 |
MQTT_HOST | MQTT服务器地址 |
USERNAME | 设备用户名 |
PASSWORD | 设备用户认证密码 |
2.3 ESP32连接EMQX
配置好参数后,下载代码至ESP32。下载代码时注意,正确的选择设备及其对应的端口号。下载完毕代码后,ESP32会连接EMQX服务器
在ESP32的log输出端口也能看见相应的信息:
3 ESP32Client实现广播和订阅消息
3.1 广播消息
3.1.1 编写广播消息函数
实现功能:ESP32 MQTT Client 广播温度湿度数据
{
"temperature": 22.5,
"light": 2000,
"humidity": 62.3
}
代码实现:
void pubSensors()
{
mqttClient.publish("temperature", 0, true, "22.5");
mqttClient.publish("humidity", 1, true, "62.5");
mqttClient.publish("light", 2, true, "2000");
}
在loop函数中,调用该广播函数:
3.1.2 下载和验证
下载代码至ESP32:
下载完成后,登录到EMQX服务器上,可以看见,ESP32 MQTT Client已经发布信息至服务器:
3.1.3 订阅ESP32 Client消息
使用MQTT.fx订阅ESP32 Client消息,具体方法如下:
在EMQX上可以看见mqtt_user1:
使用MQTT.fx分别订阅如下Topic
{
"temperature",
"light",
"humidity"
}
可以看见MQTT.fx收到ESP32发布的消息,在EMQX服务器上同时可以看见如下信息:mqtt_user1已经订阅了这三个Topic。
3.2 订阅消息
3.2.1 MQTT.fx定义广播消息
在 MQTT.fx定义广播消息的类型,其分别为:
bool 类型消息: true/false
{
"switch": true
}
字符串型消息(attributes):"ON"/"OFF"
{
"attributes": "ON"
}
3.2.2 ESP32 Client订阅消息
ESP32 Client实现订阅消息功能,其具体代码如下:
void subscribeSensors()
{
mqttClient.subscribe("attributes", 1);
mqttClient.subscribe("switch", 1);
}
函数被onMqttConnect()函数调用
3.2.3 下载和验证
编译代码下载到ESP32,登录EMQX查看消息情况。在服务器上看见mqtt_user2已经订阅了这两条Topic
4 ESP32控制外围硬件
4.1 功能描述
使用ESP32订阅的Topic: switch功能控制LED, 当ESP32收到:
订阅消息-1: 打开led
{
"switch": true
}
订阅消息-2: 关闭led
{
"switch": false
}
4.2 代码实现
代码214: 解析playload为json格式
代码221:判断是否包含key
代码224:比较value值,value = true ,执行开灯
4.3 下载和验证
编译代码,下载至ESP32中,然后在MQTT.fx中广播消息
1)验证开灯功能
2)验证关灯功能
5 源代码文件
#include <ArduinoJson.h>
#include "defines.h"
#include <WiFi.h>
extern "C"
{
#include "freertos/FreeRTOS.h"
#include "freertos/timers.h"
}
#include <AsyncMqtt_Generic.h>
#define MQTT_HOST IPAddress(192, 168, 1, 11)
#define MQTT_PORT 1883
#define WIFI_SSID ""
#define WIFI_PASSWORD ""
#define USERNAME "mqtt_user2"
#define PASSWORD "123456"
// 设置 LED GPIO 引脚
const int LED_PIN = 2;
const char *PubTopic = "async-mqtt/ESP32_Pub"; // Topic to publish
AsyncMqttClient mqttClient;
TimerHandle_t mqttReconnectTimer;
TimerHandle_t wifiReconnectTimer;
unsigned long timer1 = millis();
const int report_interval = 1000 * 30;
void pubSensors();
void subscribeSensors();
void connectToWifi()
{
Serial.println("Connecting to Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
void connectToMqtt()
{
Serial.println("Connecting to MQTT...");
mqttClient.connect();
}
void WiFiEvent(WiFiEvent_t event)
{
switch (event)
{
#if USING_CORE_ESP32_CORE_V200_PLUS
case ARDUINO_EVENT_WIFI_READY:
Serial.println("WiFi ready");
break;
case ARDUINO_EVENT_WIFI_STA_START:
Serial.println("WiFi STA starting");
break;
case ARDUINO_EVENT_WIFI_STA_CONNECTED:
Serial.println("WiFi STA connected");
break;
case ARDUINO_EVENT_WIFI_STA_GOT_IP6:
case ARDUINO_EVENT_WIFI_STA_GOT_IP:
Serial.println("WiFi connected");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case ARDUINO_EVENT_WIFI_STA_LOST_IP:
Serial.println("WiFi lost IP");
break;
case ARDUINO_EVENT_WIFI_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
xTimerStart(wifiReconnectTimer, 0);
break;
#else
case SYSTEM_EVENT_STA_GOT_IP:
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
xTimerStart(wifiReconnectTimer, 0);
break;
#endif
default:
break;
}
}
void printSeparationLine()
{
Serial.println("************************************************");
}
void onMqttConnect(bool sessionPresent)
{
Serial.print("Connected to MQTT broker: ");
Serial.print(MQTT_HOST);
Serial.print(", port: ");
Serial.println(MQTT_PORT);
Serial.print("PubTopic: ");
Serial.println(PubTopic);
printSeparationLine();
Serial.print("Session present: ");
Serial.println(sessionPresent);
uint16_t packetIdSub = mqttClient.subscribe(PubTopic, 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
mqttClient.publish(PubTopic, 0, true, "ESP32 Test");
Serial.println("Publishing at QoS 0");
uint16_t packetIdPub1 = mqttClient.publish(PubTopic, 1, true, "test 2");
Serial.print("Publishing at QoS 1, packetId: ");
Serial.println(packetIdPub1);
uint16_t packetIdPub2 = mqttClient.publish(PubTopic, 2, true, "test 3");
Serial.print("Publishing at QoS 2, packetId: ");
Serial.println(packetIdPub2);
printSeparationLine();
subscribeSensors();
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
(void) reason;
Serial.println("Disconnected from MQTT.");
if (WiFi.isConnected())
{
xTimerStart(mqttReconnectTimer, 0);
}
}
void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
}
void onMqttUnsubscribe(const uint16_t& packetId)
{
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
const size_t& len, const size_t& index, const size_t& total)
{
(void) payload;
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
Serial.println("attributes get response: ");
Serial.println(payload);
DynamicJsonDocument doc(512);
DeserializationError error = deserializeJson(doc, payload);
if (error)
{
Serial.printf("deserialize error: %s\n", error.f_str());
return;
}
JsonObject obj = doc.as<JsonObject>();
if (obj.containsKey("switch"))
{
if (obj["switch"] == true)
{
Serial.println("switch ON");
// todo 输出 GPIO 控制继电器
digitalWrite(LED_PIN, HIGH);
}
else
{
Serial.println("switch OFF");
// todo 输出 GPIO 控制继电器
digitalWrite(LED_PIN, LOW);
}
}
}
void onMqttPublish(const uint16_t& packetId)
{
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void setup()
{
Serial.begin(115200);
while (!Serial && millis() < 5000);
Serial.print("\nStarting FullyFeature_ESP32 on ");
Serial.println(ARDUINO_BOARD);
Serial.println(ASYNC_MQTT_GENERIC_VERSION);
mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0,
reinterpret_cast<TimerCallbackFunction_t>(connectToMqtt));
wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0,
reinterpret_cast<TimerCallbackFunction_t>(connectToWifi));
WiFi.onEvent(WiFiEvent);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
mqttClient.setCredentials(USERNAME,PASSWORD);
mqttClient.setClientId( USERNAME );
connectToWifi();
// 拉低 LED
pinMode(LED_PIN, OUTPUT);
digitalWrite(LED_PIN, LOW);
}
void subscribeSensors()
{
mqttClient.subscribe("attributes", 1);
mqttClient.subscribe("switch", 1);
}
void pubSensors()
{
mqttClient.publish("temperature", 0, true, "22.5");
mqttClient.publish("humidity", 1, true, "62.5");
mqttClient.publish("light", 2, true, "2000");
}
void loop()
{
// 按间隔时间上报传感器数据
if (millis() - timer1 > report_interval)
{
timer1 = millis();
pubSensors();
}
}