MQTT简介
MQTT是一种基于发布订阅模式的消息传输协议
消息:设备和设备之间传输的数据,或者服务和服务之间传输的数据
协议:传输数据时所遵循的规则
轻量级:MQTT协议占用的请求源较少,数据报文较小
可靠较强:多种消息的质量等级
安全性较强:提供传输层和套接层加密功能
双向通讯:客户端既可以发送数据,也可以从代理软件中获取数据
docker安装emqx 5.7
docker pull emqx/emqx:5.7
mkdir -p /data/docker/emqx/data /data/docker/emqx/log /data/docker/emqx/etc
#手动复制默认配置文件到宿主机
docker run -d --name emqx_temp emqx/emqx:5.7
docker cp emqx_temp:/opt/emqx/etc /data/docker/emqx
docker stop emqx_temp && docker rm emqx_temp
docker run -d --name emqx \
-u root \
-p 1883:1883 -p 8083:8083 \
-p 8084:8084 -p 8883:8883 \
-p 18083:18083 \
-v /data/docker/emqx/data:/opt/emqx/data \
-v /data/docker/emqx/log:/opt/emqx/log \
-v /data/docker/emqx/etc:/opt/emqx/etc \
emqx/emqx:5.7
#遇到权限问题:mkdir: cannot create directory ‘/opt/emqx/data/configs’: Permission denied
#加了参数 -u root,使用root身份启动
#--privileged=true
#如果忘记密码,可以进入docker容器修改
docker exec -it emqx /bin/bash
./bin/emqx_ctl admins passwd admin public123
EMQX后台管理页面
http://192.168.1.131:18083
admin/public
MQTTX客户端
在官网下载安装客户端 https://mqttx.app/zh
Docker安装MQTTX
docker pull emqx/mqttx-web:v1.10.1
docker run --rm --name mqttx-web -p 80:80 emqx/mqttx-web:v1.10.1
访问 http://192.168.1.131
如果是MQTTX客户端连接,使用 mqtt://192.168.1.131:1883
如果使用Docker安装MQTTX的,只能在网页中配置连接:ws://192.168.1.131:8083
wireshark网络监听工具
https://www.wireshark.org/download.html
下载并安装
打开Wireshark,监听网卡VMware Network Adapter VMnet8
在过滤器中输入mqtt,使用MQTTX桌面版本连接,可以查看报文的详细数据
QOS
消息的质量等级
0:消息最多发送一次
1:消息至少发送一次
2:消息仅有一次发送
在发送消息的时候可以指定消息的质量等级
QOS = 0:即发即弃,不需要等待确认,不需要存储和重传
QOS = 1:引用了应答和重传机制,在发送消息时缓存报文,报文前中包含Message Identifier,在应答ack中返回Message Identifier,删除缓存报文
QOS = 2:在接受端收到PUBREL消息之前,会缓存Packet ID,可以过滤重复消息
主题
对消息进行分类
不建议以 / 开头或结尾
单层通配符+ :必须占据整个层级 test/+ 或者 test/+/temperature
多层通配符# :必须是占据整个层级且是主题的最后一个字符 #或者test/#
系统主题#SYS/ :获取MQTT服务器自身运行状态、消息统计、客户端上下级事件等数据
参数配置
Clean Start:客户端在和服务器建立连接的时候尝试恢复之前的会话或者直接创建全新的会话
0:之前如果有连接,会尝试恢复之前的会话(可以接受到该客户端离线时,发布者后面发布的消息)
1:创建全新会话
Session Expiry Interval:决定会话状态数据在服务端的存储时长
0:会话在网络连接断开时立即结束
大于0:会话将在网络连接断开的多少秒之后过期
以上是MQTT会话为离线客户端缓存消息的能力
保留消息
普通消息:普通消息在发送前如果该主题不存在订阅者,MQTT服务器会直接将丢弃
保留消息:保留消息可以保留在MQTT服务器中,新的订阅者如果主题匹配,立接收到该消息
MQTT服务器会为每个主题存储最新一条保留消息
在保留消息发布前订阅主题,将不会收到保留消息(当普通消息接收)
使用场景
传感串上报数据间隔时间长,但订阅者需要在订阅后立即获取到新的数据
传感器的版本号、序列号等不会经常变更的属性
保留消息的删除
- 发送一条空的保留消息
- 在Dashboard页面删除
- 在发送保留消息时,设置保留消息的过期时间
发送消息时,可以指定消息过期时间
假如客户端意外离线,重新连接时如果消息已过期,则获取不到这条消息了(消息时效性:秒)
遗嘱消息
客户端可以在连接服务端中注册一个遗嘱消息,当该客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送些遗嘱消息
Will Delay Interval:服务端将在网络连接关闭后延迟多久发布遗嘱消息(秒)
需要在连接前配置遗嘱消息
如果会话有效时间小于遗嘱延迟时间,则在会话结束前发送遗嘱消息
延迟发布
MQTT服务端收到发布者发布的消息以后,延迟一段时间以后再把消息转发给订阅者
延迟主题格式 ${delayed}/{DelayInterval}/{TopicName} 单位秒
Dashboard =》监控 =》延迟发布 =》设置,启用延迟发布
用户属性
用来发送一些自定义的内容
订阅选项
No local 服务端是否可以将消息转发给发布这个消息的客户端(默认值0:可以)
在桥接场景中,需要配置成1:不可以转发避免死循环
自动订阅:Dashboard =》MQTT高级特性 =》自动订阅,添加
黑名单:封禁某些客户端的访问
连接抖动检测:自动封禁那些被检测到短时间内频繁登录的客户端
共享订阅
相当于消费组,每个组消费同一个主题里所有的消息,一个组里面使用轮循等策略消费消息(并行消费及高可用性)
带群组的共享订阅:$share/<group-name>/{TopicName}
不带群组格式:$queue/{TopicName}
(相当于同一个消费组,组内并行消费)
排它订阅
一个主题同一时刻仅被允许存在一个订阅者 $exclusive/{TopicName}
使用docker compose创建Kafka测试环境
启动命令:docker compose -f docker-compose-kafka.yml up -d
services:
zookeeper:
image: wurstmeister/zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.131:9092,PLAINTEXT_HOST://192.168.1.131:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
networks:
- kafka-net
kafka-eagle:
image: nickzurich/efak
container_name: kafka-eagle
ports:
- "8048:8048"
environment:
EFAK_DB: h2
CLUSTER_ZK_LIST: zookeeper:2181/cluster1
CLUSTER_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- kafka
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
volumes:
kafka-data: {}
zookeeper-data: {}
Kafka管理后台
http://192.168.1.131:8048
admin/123456
创建主题:Topics =》Create,Topic Name:test_mqtt_topic
Java代码发送kafka消息
@SpringBootTest
class MqttKafkaDemoApplicationTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void testSendMsg() {
kafkaTemplate.send("test_mqtt_topic", "hello,kafka");
}
}
在kafka eagle的Topics =》List,点击进test_mqtt_topic里面,在右侧的Preview中,可以看到最近发送的消息
关闭服务 docker compose -f docker-compose-kafka.yml down
数据集成
使用Sink与Source与外部数据系统对接
Sink用于将消息从broker发送到外部数据系统
Source用于从外部系统接收消息
规则引擎:数据来源、数据处理过程、处理结果去向
数据集成示例
将客户端发往t/a主题中的消息输出到EMQX的控制台
#Dashboard =》集成 =》规则 =》创建,SQL编辑器
SELECT * FROM "t/a"
#数据输入为消息主题t/a
#动作输出选择控制台输出
SQL语法介绍
SELECT <字段名> FROM <主题> [WHERE <条件>]
SELECT a,b FROM 't/#' //未知的列会返回undefined
SELECT * FROM '#' WHERE username = 'abc'
SELECT clientid as cid FROM '#' WHERE cid = 'abc'
SELECT clientid as cid FROM '#' WHERE username = 'abc'
SELECT clientid as cid, payload, topic, qos FROM "t/a"
FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]
FOREACH 处理数组数据
FOREACH payload.sensors as e
DO clientid, e.name as name, e.idx as idx
INCASE e.idx >= 1 #对DO选择出来的某个字段施加条件过滤
from "t/b"
t/b测试数据
{
"date": "2025-04-15",
"sensors": [
{"name": "a", "idx": 0},
{"name": "b", "idx": 1},
{"name": "c", "idx": 2}
]
}
添加规则
FOREACH payload.sensors from "t/b"
添加动作类型:消息重发布
主题:sensors/${item.idx}
内容:${item.name}
添加3个订阅:sensors/1,sensors/2,sensors/3
使用DO简化输出结果
FOREACH payload.sensors as e
DO e.idx as idx, e.name as name
from "t/c"
再次添加动作类型:消息重发布
主题:sensors/
i
d
x
内容:
{idx} 内容:
idx内容:{name}
CASE-WHEN语句
SELECT CASE WHEN payload.x < 0 THEN 0
WHEN payload.x > 7 THEN 7
ELSE payload.x
END as x
FROM "t/abc"
内置函数
SELECT abs(-1) as x,
concat(payload.msg, ' goods') as name
FROM "t/aaa"
Webhook
Webhook将EMQX客户端消息和事件发送到外部HTTP服务中
@RestController
@RequestMapping("webHook")
public class WebHookController {
@PostMapping("notify")
public void notifyMsg(@RequestBody Map<Object, Object> body) {
System.out.println(body);
}
}
在Dashboard =》集成 =》Webhook 中创建 Webhook
名称notify_WH_D,触发器:消息发布,过滤主题:t/1
http://192.168.0.199:8080/webHook/notify
docker查看挂载的磁盘信息
docker volume ls
docker volume inspect emqx_log
在Vue中使用MQTT
npm create vite@latest
npm install
npm install element-plus --save
npm install mqtt --save
修改main.js
import { createApp } from 'vue'
import './style.css'
import App from './App.vue'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
const app = createApp(App)
app.use(ElementPlus)
app.mount('#app')
添加页面MqttDemo.vue,连接emqx订阅发送消息
<script setup>
import { ref, reactive } from 'vue'
import mqtt from 'mqtt'
const qosList = [0, 1, 2]
// 定义连接参数的对象
const connectInfo = ref({
protocol: 'ws',
host: '192.168.1.131',
port: '8083',
clientId: 'emqx_vue_client_' + Math.random().toString().substring(2, 8),
username: 'zhangsan',
password: '123456'
})
const clientInitData = ref({
connected: false
})
const client = ref({})
const createConnection = () => {
const { protocol, host, port, ...options } = connectInfo.value
const connectUrl = `${protocol}://${host}:${port}/mqtt`
client.value = mqtt.connect(connectUrl, options)
clientInitData.value.connected = true
console.log('连接建立成功了')
}
const closeConnection = () => {
client.value.end(false, () => {
clientInitData.value.connected = false
console.log('连接关闭成功了')
})
}
const subscriptionInfo = ref({
topic: '',
qos: 0
})
const receivedMessages = ref({})
const subscriptionInitData = ref({
subscription: false
})
const subscriptionTopicHandler = () => {
const { topic, qos } = subscriptionInfo.value
client.value.subscribe(topic, { qos }, (error, res) => {
if (error) {
console.log('主题订阅失败了', error)
return
}
subscriptionInitData.value.subscription = true
//给链接对象注册一个接收消息的事件
client.value.on('message', (topic, message) => {
console.log('接收到消息:', topic, message)
receivedMessages.value = topic + '--->' + message
})
})
}
const unSubscriptionTopicHandler = () => {
const { topic, qos } = subscriptionInfo.value
client.value.unsubscribe(topic, { qos }, (error, res) => {
if (error) {
console.log('主题取消订阅失败了', error)
return
}
subscriptionInitData.value.subscription = false
})
}
const publishInfo = ref({
topic: '',
qos: 0,
payload: ''
})
const doPublish = () => {
const { topic, qos, payload } = publishInfo.value
client.value.publish(topic, payload, { qos }, (error, res) => {
if (error) {
console.log('发送消息失败了', error)
return
}
})
}
</script>
<template>
<el-card>
<h4>配置信息</h4>
<el-form label-width="120px">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item label="协议" prop="protocol">
<el-select v-model="connectInfo.protocol">
<el-option label="ws://" value="ws"></el-option>
<el-option label="wss://" value="wss"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="主机地址" prop="host">
<el-input v-model="connectInfo.host"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="端口" prop="port">
<el-input v-model="connectInfo.port"></el-input>
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="20">
<el-col :span="8">
<el-form-item label="clientId" prop="clientId">
<el-input v-model="connectInfo.clientId"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="用户名" prop="username">
<el-input v-model="connectInfo.username"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="密码" prop="password">
<el-input v-model="connectInfo.password"></el-input>
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="20">
<el-col :span="24">
<el-button type="primary" :disabled="clientInitData.connected"
@click="createConnection">建立连接</el-button>
<el-button type="danger" :disabled="!clientInitData.connected"
@click="closeConnection">断开连接</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
<el-card>
<h4>订阅主题</h4>
<el-form label-width="120px">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item label="Topic" prop="topic">
<el-input v-model="subscriptionInfo.topic"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="Qos" prop="qos">
<el-select v-model="subscriptionInfo.qos">
<el-option v-for="item in qosList" :label="item" :value="item"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-button type="primary" :disabled="subscriptionInitData.subscription"
@click="subscriptionTopicHandler">订阅主题</el-button>
<el-button type="warning" :disabled="!subscriptionInitData.subscription"
@click="unSubscriptionTopicHandler">取消订阅</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
<el-card>
<h4>发布消息</h4>
<el-form label-width="120px">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item label="Topic" prop="topic">
<el-input v-model="publishInfo.topic"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="Qos" prop="field5">
<el-select v-model="publishInfo.qos">
<el-option v-for="item in qosList" :label="item" :value="item"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item label="Payload" prop="payload">
<el-input v-model="publishInfo.payload"></el-input>
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="20">
<el-col :span="24">
<el-button type="primary" @click="doPublish">发布消息</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
<el-card>
<h4>接收到的消息</h4>
<el-form>
<el-input v-model="receivedMessages" style="width: 98%" :rows="5" type="textarea" />
</el-form>
</el-card>
</template>
<style scoped>
.el-row {
margin-bottom: 20px;
}
.el-col {
border-radius: 4px;
}
</style>
Java中使用客户端连接EMQX
- 在pom.xml中引入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
- 添加测试类
@SpringBootTest
class MqttSpringDemoApplicationTests {
@Test
void contextLoads() {
}
@Test
public void testSendMsg() throws MqttException {
String serverURI = "tcp://192.168.1.131:1883";
String clientId = "paho_client_123";
MemoryPersistence memoryPersistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("zhangsan");
options.setPassword("123456".toCharArray());
// 创建新的连接
options.setCleanSession(true);
mqttClient.connect(options);
System.out.println("连接创建成功了");
MqttMessage mqttMessage = new MqttMessage("hello mqtt".getBytes());
mqttMessage.setQos(0);
mqttClient.publish("a/c", mqttMessage);
System.out.println("消息发送成功");
mqttClient.disconnect();
mqttClient.close();
}
@Test
public void testReceiveMsg() throws MqttException {
String serverURI = "tcp://192.168.1.131:1883";
String clientId = "paho_client_123";
MemoryPersistence memoryPersistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("zhangsan");
options.setPassword("123456".toCharArray());
// 创建新的连接
options.setCleanSession(true);
mqttClient.connect(options);
System.out.println("连接创建成功了");
mqttClient.subscribe("a/d", 2);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("接收到消息 " + topic + " " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
while (true);
}
@Test
public void testCreateConnection() throws MqttException {
String serverURI = "tcp://192.168.1.131:1883";
String clientId = "paho_client_123";
MemoryPersistence memoryPersistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("zhangsan");
options.setPassword("123456".toCharArray());
// 创建新的连接
options.setCleanSession(true);
mqttClient.connect(options);
System.out.println("连接创建成功了");
while (true);
}
}
SpringBoot中通过SpringIntegration接入EMQX
在SpringBoot中通过SpringIntegration接入EMQX(MQTT消息服务器)可以更高效地管理数据流
利用SpringIntegration的通道(Channel)、适配器(Adapter)和消息处理器(MessageHandler)实现复杂的消息路由、转换和聚合。
关键组件说明
组件 | 作用 |
---|---|
MqttPahoMessageDrivenChannelAdapter | 入站适配器:订阅 EMQX 主题并将消息传递到输入通道。 |
MqttPahoMessageHandler | 出站适配器:从输出通道接收消息并发布到 EMQX 主题。 |
MessageChannel | 通道:连接适配器和处理器的管道,支持同步 / 异步模式。 |
@ServiceActivator | 服务激活器:处理通道中的消息并触发业务逻辑。 |
@MessagingGateway | 消息网关:提供接口简化消息发送到出站通道。 |
- pom.xml中引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.55</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
- 配置项目参数
修改application.properties
spring.application.name=spring-integration-mqtt
spring.mqtt.username=zhangsan
spring.mqtt.password=123456
spring.mqtt.url=tcp://192.168.1.131:1883
spring.mqtt.subClientId=sub_client_id_123
spring.mqtt.subTopic=iot/lamp/line,iot/lamp/device/status
spring.mqtt.pubClientId=pub_client_id_123
spring.mqtt.apiUrl=http://192.168.1.131:18083
spring.mqtt.secretKey=6l6rxxo2trs3QJjfK0OUhPIzjKlOqAOlnhniCuhonsI
spring.mqtt.apiKey=99d1ecc382aa4d58
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.1.131:3306/lamp_test?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.mapper-locations=classpath*:/mapper/*Mapper.xml
httpclient.max-total=200
httpclient.default-max-per-route=20
httpclient.connect-timeout=10000
httpclient.socket-timeout=10000
配置类加载数据
@Configuration
public class MqttConfig {
@Autowired
private MqttConfigProperties mqttConfigProperties;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
options.setCleanSession(true);
options.setUserName(mqttConfigProperties.getUsername());
options.setPassword(mqttConfigProperties.getPassword().toCharArray());
clientFactory.setConnectionOptions(options);
return clientFactory;
}
}
@Configuration
public class HttpClientConfig {
@Value("${httpclient.max-total}")
private int maxTotal;
@Value("${httpclient.default-max-per-route}")
private int maxPerRoute;
@Value("${httpclient.connect-timeout}")
private int connectTimeout;
@Value("${httpclient.socket-timeout}")
private int socketTimeout;
@Bean
public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
manager.setMaxTotal(maxTotal);
manager.setDefaultMaxPerRoute(maxPerRoute);
return manager;
}
@Bean
public CloseableHttpClient httpClient(PoolingHttpClientConnectionManager manager) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
.build();
return HttpClients.custom()
.setConnectionManager(manager)
.setDefaultRequestConfig(requestConfig)
.build();
}
}
自动加载配置信息,创建文件
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
#内容每行是一个类
com.achieve.mqtt.domain.MqttConfigProperties
- 入站适配器:订阅消息
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfigProperties mqttConfigProperties;
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Autowired
private ReceiverMessageHandler receiverMessageHandler;
// 定义消息通道(订阅消息)
@Bean
public MessageChannel messageInboundChannel() {
return new DirectChannel();
}
/**
* 配置入站适配器,设置订阅主题,以及指定消息的相关属性
*/
@Bean
public MessageProducer messageProducer() {
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter =
new MqttPahoMessageDrivenChannelAdapter(
mqttConfigProperties.getUrl(),
mqttConfigProperties.getSubClientId(),
mqttClientFactory,
mqttConfigProperties.getSubTopic().split(","));
mqttPahoMessageDrivenChannelAdapter.setQos(1);
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());
return mqttPahoMessageDrivenChannelAdapter;
}
// 服务激活器:处理输入通道消息
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel")
public MessageHandler messageHandler() {
return receiverMessageHandler;
}
}
@Component
public class ReceiverMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String payload = message.getPayload().toString();
MessageHeaders headers = message.getHeaders();
Object topicName = headers.get("mqtt_receivedTopic").toString();
System.out.println(payload);
System.out.println(message);
System.out.println(topicName);
}
}
- 从输出通道接收消息并发布到EMQX主题
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfigProperties mqttConfigProperties;
@Autowired
private MqttPahoClientFactory mqttClientFactory;
// 定义消息通道(发布消息)
@Bean
public MessageChannel messageOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "messageOutboundChannel")
public MessageHandler mqttOutboundMessageHandler() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttConfigProperties.getUrl(),
mqttConfigProperties.getPubClientId(),
mqttClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("default");
messageHandler.setDefaultQos(0);
return messageHandler;
}
}
- 消息网关:提供接口简化消息发送到出站通道
//网关接口发送消息(用于发送消息到出站通道)
@MessagingGateway(defaultRequestChannel = "messageOutboundChannel")
public interface MqttGateway {
void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);
void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload);
}
@Component
public class MqttMessageSender {
@Autowired
private MqttGateway mqttGateway;
public void sendMsg(String topic, String payload) {
mqttGateway.sendMsgToMqtt(topic, payload);
}
public void sendMsg(String topic, int qos, String payload) {
mqttGateway.sendMsgToMqtt(topic, qos, payload);
}
}
- 测试发送消息
@SpringBootTest
class SpringIntegrationMqttApplicationTests {
@Autowired
private MqttMessageSender mqttMessageSender;
@Test
public void testSendMsg() {
mqttMessageSender.sendMsg("a/e", "hello world");
}
}
- 查询设备在线状态
@RestController
@RequestMapping("/api/lamp")
@Slf4j
public class LampApiController {
@Autowired
private MqttMessageSender mqttMessageSender;
@Autowired
private MqttConfigProperties mqttConfigProperties;
@Autowired
private CloseableHttpClient httpClient;
@GetMapping(value = "{deviceId}/{status}")
public String sendStatusLampMsg(@PathVariable String deviceId, @PathVariable Integer status) {
Map<String, Object> map = Map.of("deviceId", deviceId, "status", status);
String json = JSON.toJSONString(map);
mqttMessageSender.sendMsg("iot/lamp/server/line", json);
return "ok";
}
/**
* 查询设备是否在线,v4和v5版本返回值不同;认证使用Basic方式
* @param clientId
* @return
*/
@GetMapping(value = "online/{clientId}")
public boolean isClientOnline(@PathVariable String clientId) {
String apiUrl = mqttConfigProperties.getApiUrl() + "/api/v5/clients/" + clientId;
HttpGet httpGet = new HttpGet(apiUrl);
httpGet.setHeader(HttpHeaders.ACCEPT, "application/json");
String auth = Base64.getEncoder()
.encodeToString((mqttConfigProperties.getApiKey() + ":" + mqttConfigProperties.getSecretKey()).getBytes());
httpGet.setHeader("Authorization", "Basic " + auth);
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
int statusCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
if (statusCode == HttpStatus.SC_OK && entity != null) {
String json = EntityUtils.toString(entity, StandardCharsets.UTF_8);
log.info("isClientOnline data {}", json);
JSONObject jsonObject = JSON.parseObject(json);
return jsonObject.getBoolean("connected");
} else if (statusCode == HttpStatus.SC_NOT_FOUND) {
return false; // 设备不存在
} else {
throw new RuntimeException("EMQX API 请求失败: HTTP " + statusCode);
}
} catch (IOException e) {
throw new RuntimeException("网络通信异常", e);
}
}
}
测试地址及数据
http://localhost:8080/api/lamp/online/device-123
#上线
iot/lamp/line
{
"deviceId": "device-123456",
"status": 1
}
#服务器下发指令 http://localhost:8080/api/lamp/device-123456/1
iot/lamp/server/line
{
"deviceId": "device-123456",
"status": 1
}
#上报状态
iot/lamp/device/status
{
"deviceId": "device-123456",
"status": 1
}
源代码地址
https://gitee.com/galen.zhang/mqtt-demo