EMQX学习笔记

news2025/4/22 23:24:11

MQTT简介

MQTT是一种基于发布订阅模式的消息传输协议
消息:设备和设备之间传输的数据,或者服务和服务之间传输的数据
协议:传输数据时所遵循的规则

轻量级:MQTT协议占用的请求源较少,数据报文较小
可靠较强:多种消息的质量等级
安全性较强:提供传输层和套接层加密功能
双向通讯:客户端既可以发送数据,也可以从代理软件中获取数据


docker安装emqx 5.7

docker pull emqx/emqx:5.7
mkdir -p /data/docker/emqx/data /data/docker/emqx/log /data/docker/emqx/etc

#手动复制默认配置文件到宿主机
docker run -d --name emqx_temp emqx/emqx:5.7
docker cp emqx_temp:/opt/emqx/etc /data/docker/emqx
docker stop emqx_temp && docker rm emqx_temp


docker run -d --name emqx \
  -u root \
  -p 1883:1883 -p 8083:8083 \
  -p 8084:8084 -p 8883:8883 \
  -p 18083:18083 \
  -v /data/docker/emqx/data:/opt/emqx/data \
  -v /data/docker/emqx/log:/opt/emqx/log \
  -v /data/docker/emqx/etc:/opt/emqx/etc \
  emqx/emqx:5.7
#遇到权限问题:mkdir: cannot create directory ‘/opt/emqx/data/configs’: Permission denied
#加了参数 -u root,使用root身份启动
#--privileged=true

#如果忘记密码,可以进入docker容器修改
docker exec -it emqx /bin/bash
./bin/emqx_ctl admins passwd admin public123

EMQX后台管理页面
http://192.168.1.131:18083
admin/public


MQTTX客户端

在官网下载安装客户端 https://mqttx.app/zh

Docker安装MQTTX

docker pull emqx/mqttx-web:v1.10.1
docker run --rm --name mqttx-web -p 80:80 emqx/mqttx-web:v1.10.1

访问 http://192.168.1.131

如果是MQTTX客户端连接,使用 mqtt://192.168.1.131:1883
如果使用Docker安装MQTTX的,只能在网页中配置连接:ws://192.168.1.131:8083

wireshark网络监听工具

https://www.wireshark.org/download.html
下载并安装

打开Wireshark,监听网卡VMware Network Adapter VMnet8
在过滤器中输入mqtt,使用MQTTX桌面版本连接,可以查看报文的详细数据


QOS

消息的质量等级
0:消息最多发送一次
1:消息至少发送一次
2:消息仅有一次发送

在发送消息的时候可以指定消息的质量等级
QOS = 0:即发即弃,不需要等待确认,不需要存储和重传
QOS = 1:引用了应答和重传机制,在发送消息时缓存报文,报文前中包含Message Identifier,在应答ack中返回Message Identifier,删除缓存报文
QOS = 2:在接受端收到PUBREL消息之前,会缓存Packet ID,可以过滤重复消息


主题

对消息进行分类
不建议以 / 开头或结尾
单层通配符+ :必须占据整个层级 test/+ 或者 test/+/temperature
多层通配符# :必须是占据整个层级且是主题的最后一个字符 #或者test/#

系统主题#SYS/ :获取MQTT服务器自身运行状态、消息统计、客户端上下级事件等数据


参数配置

Clean Start:客户端在和服务器建立连接的时候尝试恢复之前的会话或者直接创建全新的会话
0:之前如果有连接,会尝试恢复之前的会话(可以接受到该客户端离线时,发布者后面发布的消息)
1:创建全新会话

Session Expiry Interval:决定会话状态数据在服务端的存储时长
0:会话在网络连接断开时立即结束
大于0:会话将在网络连接断开的多少秒之后过期

以上是MQTT会话为离线客户端缓存消息的能力


保留消息

普通消息:普通消息在发送前如果该主题不存在订阅者,MQTT服务器会直接将丢弃
保留消息:保留消息可以保留在MQTT服务器中,新的订阅者如果主题匹配,立接收到该消息

MQTT服务器会为每个主题存储最新一条保留消息
在保留消息发布前订阅主题,将不会收到保留消息(当普通消息接收)

使用场景
传感串上报数据间隔时间长,但订阅者需要在订阅后立即获取到新的数据
传感器的版本号、序列号等不会经常变更的属性

保留消息的删除

  1. 发送一条空的保留消息
  2. 在Dashboard页面删除
  3. 在发送保留消息时,设置保留消息的过期时间

发送消息时,可以指定消息过期时间
假如客户端意外离线,重新连接时如果消息已过期,则获取不到这条消息了(消息时效性:秒)


遗嘱消息

客户端可以在连接服务端中注册一个遗嘱消息,当该客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送些遗嘱消息

Will Delay Interval:服务端将在网络连接关闭后延迟多久发布遗嘱消息(秒)
需要在连接前配置遗嘱消息
如果会话有效时间小于遗嘱延迟时间,则在会话结束前发送遗嘱消息


延迟发布

MQTT服务端收到发布者发布的消息以后,延迟一段时间以后再把消息转发给订阅者

延迟主题格式 ${delayed}/{DelayInterval}/{TopicName} 单位秒
Dashboard =》监控 =》延迟发布 =》设置,启用延迟发布


用户属性

用来发送一些自定义的内容


订阅选项

No local 服务端是否可以将消息转发给发布这个消息的客户端(默认值0:可以)
在桥接场景中,需要配置成1:不可以转发避免死循环

自动订阅:Dashboard =》MQTT高级特性 =》自动订阅,添加

黑名单:封禁某些客户端的访问

连接抖动检测:自动封禁那些被检测到短时间内频繁登录的客户端


共享订阅

相当于消费组,每个组消费同一个主题里所有的消息,一个组里面使用轮循等策略消费消息(并行消费及高可用性)
带群组的共享订阅:$share/<group-name>/{TopicName}
不带群组格式:$queue/{TopicName} (相当于同一个消费组,组内并行消费)


排它订阅

一个主题同一时刻仅被允许存在一个订阅者 $exclusive/{TopicName}


使用docker compose创建Kafka测试环境

启动命令:docker compose -f docker-compose-kafka.yml up -d

services:
  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
    networks:
      - kafka-net

  kafka:
    image: wurstmeister/kafka
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.131:9092,PLAINTEXT_HOST://192.168.1.131:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    networks:
      - kafka-net

  kafka-eagle:
    image: nickzurich/efak
    container_name: kafka-eagle
    ports:
      - "8048:8048"
    environment:
      EFAK_DB: h2
      CLUSTER_ZK_LIST: zookeeper:2181/cluster1
      CLUSTER_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    depends_on:
      - kafka
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

volumes:
  kafka-data: {}
  zookeeper-data: {}

Kafka管理后台
http://192.168.1.131:8048
admin/123456
创建主题:Topics =》Create,Topic Name:test_mqtt_topic

Java代码发送kafka消息

@SpringBootTest
class MqttKafkaDemoApplicationTests {
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@Test
	public void testSendMsg() {
		kafkaTemplate.send("test_mqtt_topic", "hello,kafka");
	}
}

在kafka eagle的Topics =》List,点击进test_mqtt_topic里面,在右侧的Preview中,可以看到最近发送的消息

关闭服务 docker compose -f docker-compose-kafka.yml down


数据集成

使用Sink与Source与外部数据系统对接

Sink用于将消息从broker发送到外部数据系统
Source用于从外部系统接收消息

规则引擎:数据来源、数据处理过程、处理结果去向

数据集成示例

将客户端发往t/a主题中的消息输出到EMQX的控制台

#Dashboard =》集成 =》规则 =》创建,SQL编辑器
SELECT * FROM "t/a"
#数据输入为消息主题t/a
#动作输出选择控制台输出

SQL语法介绍

SELECT <字段名> FROM <主题> [WHERE <条件>]

SELECT a,b FROM 't/#'		//未知的列会返回undefined
SELECT * FROM '#' WHERE username = 'abc'
SELECT clientid as cid FROM '#' WHERE cid = 'abc'
SELECT clientid as cid FROM '#' WHERE username = 'abc'
SELECT clientid as cid, payload, topic, qos FROM "t/a"


FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]
FOREACH 处理数组数据

FOREACH payload.sensors as e
DO clientid, e.name as name, e.idx as idx
INCASE e.idx >= 1                       #对DO选择出来的某个字段施加条件过滤
from "t/b"

t/b测试数据

{
	"date": "2025-04-15",
	"sensors": [
		{"name": "a", "idx": 0},
		{"name": "b", "idx": 1},
		{"name": "c", "idx": 2}
	]
}

添加规则
FOREACH payload.sensors from "t/b"

添加动作类型:消息重发布
主题:sensors/${item.idx}
内容:${item.name}

添加3个订阅:sensors/1,sensors/2,sensors/3

使用DO简化输出结果
FOREACH payload.sensors as e
DO e.idx as idx, e.name as name
 from "t/c"

再次添加动作类型:消息重发布
主题:sensors/ i d x 内容: {idx} 内容: idx内容:{name}

CASE-WHEN语句
SELECT CASE WHEN payload.x < 0 THEN 0
WHEN payload.x > 7 THEN 7
ELSE payload.x
END as x
FROM "t/abc"

内置函数

SELECT abs(-1) as x,
concat(payload.msg, ' goods') as name
FROM "t/aaa"

Webhook

Webhook将EMQX客户端消息和事件发送到外部HTTP服务中

@RestController
@RequestMapping("webHook")
public class WebHookController {
    @PostMapping("notify")
    public void notifyMsg(@RequestBody Map<Object, Object> body) {
        System.out.println(body);
    }
}

在Dashboard =》集成 =》Webhook 中创建 Webhook

名称notify_WH_D,触发器:消息发布,过滤主题:t/1
http://192.168.0.199:8080/webHook/notify


docker查看挂载的磁盘信息

docker volume ls
docker volume inspect emqx_log

在Vue中使用MQTT

npm create vite@latest
npm install
npm install element-plus --save
npm install mqtt --save

修改main.js

import { createApp } from 'vue'
import './style.css'
import App from './App.vue'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'

const app = createApp(App)
app.use(ElementPlus)
app.mount('#app')

添加页面MqttDemo.vue,连接emqx订阅发送消息

<script setup>
import { ref, reactive } from 'vue'
import mqtt from 'mqtt'

const qosList = [0, 1, 2]
// 定义连接参数的对象
const connectInfo = ref({
    protocol: 'ws',
    host: '192.168.1.131',
    port: '8083',
    clientId: 'emqx_vue_client_' + Math.random().toString().substring(2, 8),
    username: 'zhangsan',
    password: '123456'
})

const clientInitData = ref({
    connected: false
})
const client = ref({})
const createConnection = () => {
    const { protocol, host, port, ...options } = connectInfo.value
    const connectUrl = `${protocol}://${host}:${port}/mqtt`
    client.value = mqtt.connect(connectUrl, options)
    clientInitData.value.connected = true
    console.log('连接建立成功了')
}

const closeConnection = () => {
    client.value.end(false, () => {
        clientInitData.value.connected = false
        console.log('连接关闭成功了')
    })

}

const subscriptionInfo = ref({
    topic: '',
    qos: 0
})

const receivedMessages = ref({})
const subscriptionInitData = ref({
    subscription: false
})
const subscriptionTopicHandler = () => {
    const { topic, qos } = subscriptionInfo.value
    client.value.subscribe(topic, { qos }, (error, res) => {
        if (error) {
            console.log('主题订阅失败了', error)
            return
        }
        subscriptionInitData.value.subscription = true
        //给链接对象注册一个接收消息的事件
        client.value.on('message', (topic, message) => {
            console.log('接收到消息:', topic, message)
            receivedMessages.value = topic + '--->' + message
        })
    })
}

const unSubscriptionTopicHandler = () => {
    const { topic, qos } = subscriptionInfo.value
    client.value.unsubscribe(topic, { qos }, (error, res) => {
        if (error) {
            console.log('主题取消订阅失败了', error)
            return
        }
        subscriptionInitData.value.subscription = false
    })
}

const publishInfo = ref({
    topic: '',
    qos: 0,
    payload: ''
})

const doPublish = () => {
    const { topic, qos, payload } = publishInfo.value
    client.value.publish(topic, payload, { qos }, (error, res) => {
        if (error) {
            console.log('发送消息失败了', error)
            return
        }
    })
}

</script>

<template>
    <el-card>
        <h4>配置信息</h4>
        <el-form label-width="120px">
            <el-row :gutter="20">
                <el-col :span="8">
                    <el-form-item label="协议" prop="protocol">
                        <el-select v-model="connectInfo.protocol">
                            <el-option label="ws://" value="ws"></el-option>
                            <el-option label="wss://" value="wss"></el-option>
                        </el-select>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="主机地址" prop="host">
                        <el-input v-model="connectInfo.host"></el-input>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="端口" prop="port">
                        <el-input v-model="connectInfo.port"></el-input>
                    </el-form-item>
                </el-col>
            </el-row>

            <el-row :gutter="20">
                <el-col :span="8">
                    <el-form-item label="clientId" prop="clientId">
                        <el-input v-model="connectInfo.clientId"></el-input>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="用户名" prop="username">
                        <el-input v-model="connectInfo.username"></el-input>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="密码" prop="password">
                        <el-input v-model="connectInfo.password"></el-input>
                    </el-form-item>
                </el-col>
            </el-row>

            <el-row :gutter="20">
                <el-col :span="24">
                    <el-button type="primary" :disabled="clientInitData.connected"
                        @click="createConnection">建立连接</el-button>
                    <el-button type="danger" :disabled="!clientInitData.connected"
                        @click="closeConnection">断开连接</el-button>
                </el-col>
            </el-row>
        </el-form>
    </el-card>

    <el-card>
        <h4>订阅主题</h4>
        <el-form label-width="120px">
            <el-row :gutter="20">
                <el-col :span="8">
                    <el-form-item label="Topic" prop="topic">
                        <el-input v-model="subscriptionInfo.topic"></el-input>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="Qos" prop="qos">
                        <el-select v-model="subscriptionInfo.qos">
                            <el-option v-for="item in qosList" :label="item" :value="item"></el-option>
                        </el-select>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-button type="primary" :disabled="subscriptionInitData.subscription"
                        @click="subscriptionTopicHandler">订阅主题</el-button>
                    <el-button type="warning" :disabled="!subscriptionInitData.subscription"
                        @click="unSubscriptionTopicHandler">取消订阅</el-button>
                </el-col>
            </el-row>
        </el-form>
    </el-card>

    <el-card>
        <h4>发布消息</h4>
        <el-form label-width="120px">
            <el-row :gutter="20">
                <el-col :span="8">
                    <el-form-item label="Topic" prop="topic">
                        <el-input v-model="publishInfo.topic"></el-input>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="Qos" prop="field5">
                        <el-select v-model="publishInfo.qos">
                            <el-option v-for="item in qosList" :label="item" :value="item"></el-option>
                        </el-select>
                    </el-form-item>
                </el-col>
                <el-col :span="8">
                    <el-form-item label="Payload" prop="payload">
                        <el-input v-model="publishInfo.payload"></el-input>
                    </el-form-item>
                </el-col>
            </el-row>

            <el-row :gutter="20">
                <el-col :span="24">
                    <el-button type="primary" @click="doPublish">发布消息</el-button>
                </el-col>
            </el-row>
        </el-form>
    </el-card>

    <el-card>
        <h4>接收到的消息</h4>
        <el-form>
            <el-input v-model="receivedMessages" style="width: 98%" :rows="5" type="textarea" />
        </el-form>
    </el-card>

</template>

<style scoped>
.el-row {
    margin-bottom: 20px;
}

.el-col {
    border-radius: 4px;
}
</style>

Java中使用客户端连接EMQX

  1. 在pom.xml中引入依赖
		<dependency>
			<groupId>org.eclipse.paho</groupId>
			<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
			<version>1.2.2</version>
		</dependency>
  1. 添加测试类
@SpringBootTest
class MqttSpringDemoApplicationTests {

	@Test
	void contextLoads() {
	}

	@Test
	public void testSendMsg() throws MqttException {
		String serverURI = "tcp://192.168.1.131:1883";
		String clientId = "paho_client_123";
		MemoryPersistence memoryPersistence = new MemoryPersistence();
		MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
		MqttConnectOptions options = new MqttConnectOptions();
		options.setUserName("zhangsan");
		options.setPassword("123456".toCharArray());
		// 创建新的连接
		options.setCleanSession(true);
		mqttClient.connect(options);
		System.out.println("连接创建成功了");

		MqttMessage mqttMessage = new MqttMessage("hello mqtt".getBytes());
		mqttMessage.setQos(0);
		mqttClient.publish("a/c", mqttMessage);
		System.out.println("消息发送成功");
		mqttClient.disconnect();
		mqttClient.close();
	}

	@Test
	public void testReceiveMsg() throws MqttException {
		String serverURI = "tcp://192.168.1.131:1883";
		String clientId = "paho_client_123";
		MemoryPersistence memoryPersistence = new MemoryPersistence();
		MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
		MqttConnectOptions options = new MqttConnectOptions();
		options.setUserName("zhangsan");
		options.setPassword("123456".toCharArray());
		// 创建新的连接
		options.setCleanSession(true);
		mqttClient.connect(options);
		System.out.println("连接创建成功了");

		mqttClient.subscribe("a/d", 2);
		mqttClient.setCallback(new MqttCallback() {
			@Override
			public void connectionLost(Throwable throwable) {

			}

			@Override
			public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
				System.out.println("接收到消息 " + topic + " " + new String(mqttMessage.getPayload()));
			}

			@Override
			public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

			}
		});
		while (true);
	}

	@Test
	public void testCreateConnection() throws MqttException {
		String serverURI = "tcp://192.168.1.131:1883";
		String clientId = "paho_client_123";
		MemoryPersistence memoryPersistence = new MemoryPersistence();
		MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
		MqttConnectOptions options = new MqttConnectOptions();
		options.setUserName("zhangsan");
		options.setPassword("123456".toCharArray());
		// 创建新的连接
		options.setCleanSession(true);
		mqttClient.connect(options);
		System.out.println("连接创建成功了");
		while (true);
	}
}

SpringBoot中通过SpringIntegration接入EMQX

在SpringBoot中通过SpringIntegration接入EMQX(MQTT消息服务器)可以更高效地管理数据流
利用SpringIntegration的通道(Channel)、适配器(Adapter)和消息处理器(MessageHandler)实现复杂的消息路由、转换和聚合。

关键组件说明

组件作用
MqttPahoMessageDrivenChannelAdapter入站适配器:订阅 EMQX 主题并将消息传递到输入通道。
MqttPahoMessageHandler出站适配器:从输出通道接收消息并发布到 EMQX 主题。
MessageChannel通道:连接适配器和处理器的管道,支持同步 / 异步模式。
@ServiceActivator服务激活器:处理通道中的消息并触发业务逻辑。
@MessagingGateway消息网关:提供接口简化消息发送到出站通道。
  1. pom.xml中引入依赖
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
			<version>5.4.3</version>
		</dependency>

		<dependency>
			<groupId>com.baomidou</groupId>
			<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
			<version>3.5.10.1</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.30</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba.fastjson2</groupId>
			<artifactId>fastjson2</artifactId>
			<version>2.0.55</version>
		</dependency>

		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
			<version>4.5.13</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>
  1. 配置项目参数
    修改application.properties
spring.application.name=spring-integration-mqtt
spring.mqtt.username=zhangsan
spring.mqtt.password=123456
spring.mqtt.url=tcp://192.168.1.131:1883
spring.mqtt.subClientId=sub_client_id_123
spring.mqtt.subTopic=iot/lamp/line,iot/lamp/device/status
spring.mqtt.pubClientId=pub_client_id_123
spring.mqtt.apiUrl=http://192.168.1.131:18083
spring.mqtt.secretKey=6l6rxxo2trs3QJjfK0OUhPIzjKlOqAOlnhniCuhonsI
spring.mqtt.apiKey=99d1ecc382aa4d58

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.1.131:3306/lamp_test?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root

mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.mapper-locations=classpath*:/mapper/*Mapper.xml

httpclient.max-total=200
httpclient.default-max-per-route=20
httpclient.connect-timeout=10000
httpclient.socket-timeout=10000

配置类加载数据

@Configuration
public class MqttConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
        options.setCleanSession(true);
        options.setUserName(mqttConfigProperties.getUsername());
        options.setPassword(mqttConfigProperties.getPassword().toCharArray());
        clientFactory.setConnectionOptions(options);

        return clientFactory;
    }
}
@Configuration
public class HttpClientConfig {
    @Value("${httpclient.max-total}")
    private int maxTotal;

    @Value("${httpclient.default-max-per-route}")
    private int maxPerRoute;

    @Value("${httpclient.connect-timeout}")
    private int connectTimeout;

    @Value("${httpclient.socket-timeout}")
    private int socketTimeout;

    @Bean
    public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
        PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
        manager.setMaxTotal(maxTotal);
        manager.setDefaultMaxPerRoute(maxPerRoute);
        return manager;
    }

    @Bean
    public CloseableHttpClient httpClient(PoolingHttpClientConnectionManager manager) {
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(connectTimeout)
                .setSocketTimeout(socketTimeout)
                .build();

        return HttpClients.custom()
                .setConnectionManager(manager)
                .setDefaultRequestConfig(requestConfig)
                .build();
    }
}

自动加载配置信息,创建文件
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
#内容每行是一个类
com.achieve.mqtt.domain.MqttConfigProperties

  1. 入站适配器:订阅消息
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private MqttPahoClientFactory mqttClientFactory;
    @Autowired
    private ReceiverMessageHandler receiverMessageHandler;

    // 定义消息通道(订阅消息)
    @Bean
    public MessageChannel messageInboundChannel() {
        return new DirectChannel();
    }

    /**
     * 配置入站适配器,设置订阅主题,以及指定消息的相关属性
     */
    @Bean
    public MessageProducer messageProducer() {
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        mqttConfigProperties.getUrl(),
                        mqttConfigProperties.getSubClientId(),
                        mqttClientFactory,
                        mqttConfigProperties.getSubTopic().split(","));
        mqttPahoMessageDrivenChannelAdapter.setQos(1);
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());
        return mqttPahoMessageDrivenChannelAdapter;
    }

    // 服务激活器:处理输入通道消息
    @Bean
    @ServiceActivator(inputChannel = "messageInboundChannel")
    public MessageHandler messageHandler() {
        return receiverMessageHandler;
    }
}
@Component
public class ReceiverMessageHandler implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String payload = message.getPayload().toString();
        MessageHeaders headers = message.getHeaders();
        Object topicName = headers.get("mqtt_receivedTopic").toString();
        System.out.println(payload);
        System.out.println(message);
        System.out.println(topicName);
    }
}
  1. 从输出通道接收消息并发布到EMQX主题
@Configuration
public class MqttOutboundConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    // 定义消息通道(发布消息)
    @Bean
    public MessageChannel messageOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "messageOutboundChannel")
    public MessageHandler mqttOutboundMessageHandler() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                mqttConfigProperties.getUrl(),
                mqttConfigProperties.getPubClientId(),
                mqttClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("default");
        messageHandler.setDefaultQos(0);
        return messageHandler;
    }
}
  1. 消息网关:提供接口简化消息发送到出站通道
//网关接口发送消息(用于发送消息到出站通道)
@MessagingGateway(defaultRequestChannel = "messageOutboundChannel")
public interface MqttGateway {
    void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);

    void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload);
}
@Component
public class MqttMessageSender {
    @Autowired
    private MqttGateway mqttGateway;

    public void sendMsg(String topic, String payload) {
        mqttGateway.sendMsgToMqtt(topic, payload);
    }

    public void sendMsg(String topic, int qos, String payload) {
        mqttGateway.sendMsgToMqtt(topic, qos, payload);
    }
}
  1. 测试发送消息
@SpringBootTest
class SpringIntegrationMqttApplicationTests {

	@Autowired
	private MqttMessageSender mqttMessageSender;

	@Test
	public void testSendMsg() {
		mqttMessageSender.sendMsg("a/e", "hello world");
	}
}
  1. 查询设备在线状态
@RestController
@RequestMapping("/api/lamp")
@Slf4j
public class LampApiController {
    @Autowired
    private MqttMessageSender mqttMessageSender;
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private CloseableHttpClient httpClient;

    @GetMapping(value = "{deviceId}/{status}")
    public String sendStatusLampMsg(@PathVariable String deviceId, @PathVariable Integer status) {
        Map<String, Object> map = Map.of("deviceId", deviceId, "status", status);
        String json = JSON.toJSONString(map);
        mqttMessageSender.sendMsg("iot/lamp/server/line", json);
        return "ok";
    }

    /**
     * 查询设备是否在线,v4和v5版本返回值不同;认证使用Basic方式
     * @param clientId
     * @return
     */
    @GetMapping(value = "online/{clientId}")
    public boolean isClientOnline(@PathVariable String clientId) {
        String apiUrl = mqttConfigProperties.getApiUrl() + "/api/v5/clients/" + clientId;
        HttpGet httpGet = new HttpGet(apiUrl);
        httpGet.setHeader(HttpHeaders.ACCEPT, "application/json");
        String auth = Base64.getEncoder()
                .encodeToString((mqttConfigProperties.getApiKey() + ":" + mqttConfigProperties.getSecretKey()).getBytes());
        httpGet.setHeader("Authorization", "Basic " + auth);

        try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
            int statusCode = response.getStatusLine().getStatusCode();
            HttpEntity entity = response.getEntity();

            if (statusCode == HttpStatus.SC_OK && entity != null) {
                String json = EntityUtils.toString(entity, StandardCharsets.UTF_8);
                log.info("isClientOnline data {}", json);
                JSONObject jsonObject = JSON.parseObject(json);
                return jsonObject.getBoolean("connected");
            } else if (statusCode == HttpStatus.SC_NOT_FOUND) {
                return false; // 设备不存在
            } else {
                throw new RuntimeException("EMQX API 请求失败: HTTP " + statusCode);
            }
        } catch (IOException e) {
            throw new RuntimeException("网络通信异常", e);
        }
    }
}

测试地址及数据
http://localhost:8080/api/lamp/online/device-123

#上线  
iot/lamp/line
{
	"deviceId": "device-123456",
	"status": 1
}


#服务器下发指令 http://localhost:8080/api/lamp/device-123456/1
iot/lamp/server/line
{
	"deviceId": "device-123456",
	"status": 1
}

#上报状态
iot/lamp/device/status
{
	"deviceId": "device-123456",
	"status": 1
}

源代码地址
https://gitee.com/galen.zhang/mqtt-demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2340396.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探寻Gson解析遇到不存在键值时引发的Kotlin的空指针异常的原因

文章目录 一、问题背景二、问题原因三、问题探析Kotlin空指针校验Gson.fromJson(String json, Class<T> classOfT)TypeTokenGson.fromJson(JsonReader reader, TypeToken<T> typeOfT)TypeAdapter 和 TypeAdapterFactoryReflectiveTypeAdapterFactoryRecordAdapter …

冰川流域提取分析——ArcGIS pro

一、河网提取和流域提取视频详细GIS小熊 || 6分钟学会水文分析—河网提取&#xff08;以宜宾市为例&#xff09;_哔哩哔哩_bilibili 首先你要生成研究区域DEM&#xff0c;然后依次是填洼→流向→流量→栅格计算器→河网分级→栅格河网矢量化&#xff08;得到河网.shp&#xff…

wordpress 垂直越权(CVE=2021-21389)漏洞复现详细教程

关于本地化搭建vulfocus靶场的师傅可以参考我置顶文章 KALI搭建log4j2靶场及漏洞复现全流程-CSDN博客https://blog.csdn.net/2301_78255681/article/details/147286844 描述: BuddyPress 是一个用于构建社区站点的开源 WordPress 插件。在 7.2.1 之前的 5.0.0 版本的 BuddyP…

MySQL 线上大表 DDL 如何避免锁表(pt-online-schema-change)

文章目录 1、锁表问题2、pt-online-schema-change 原理3、pt-online-schema-change 实战3.1、准备数据3.2、安装工具3.3、模拟锁表3.4、解决锁表 1、锁表问题 在系统研发过程中&#xff0c;随着业务需求千变万化&#xff0c;避免不了调整线上MySQL DDL数据表的操作&#xff0c…

剑指offer经典题目(五)

目录 栈相关 二叉树相关 栈相关 题目一&#xff1a;定义栈的数据结构&#xff0c;请在该类型中实现一个能够得到栈中所含最小元素的 min 函数&#xff0c;输入操作时保证 pop、top 和 min 函数操作时&#xff0c;栈中一定有元素。OJ地址 图示如下。 主要思想&#xff1a;我们…

3、排序算法1---按考研大纲做的

一、插入排序 1、直接插入排序 推荐先看这个视频 1.1、原理 第一步&#xff0c;索引0的位置是有序区&#xff08;有序区就是有序的部分&#xff0c;刚开始就只有第一个数据是有序的&#xff09;。第二步&#xff0c;将第2个位置到最后一个位置的元素&#xff0c;依次进行排…

llama-webui docker实现界面部署

1. 启动ollama服务 [nlp server]$ ollama serve 2025/04/21 14:18:23 routes.go:1007: INFO server config env"map[OLLAMA_DEBUG:false OLLAMA_FLASH_ATTENTION:false OLLAMA_HOST: OLLAMA_KEEP_ALIVE:24h OLLAMA_LLM_LIBRARY: OLLAMA_MAX_LOADED_MODELS:4 OLLAMA_MAX_…

Linux的Socket开发补充

是listen函数阻塞等待连接&#xff0c;还是accept函数阻塞等待连接&#xff1f; 这两个函数的名字&#xff0c;听起来像listen一直在阻塞监听&#xff0c;有连接了就accept&#xff0c;但其实不是的。 调用listen()后&#xff0c;程序会立即返回&#xff0c;继续执行后续代码&a…

Spring-AOP分析

Spring分析-AOP 1.案例引入 在上一篇文章中&#xff0c;【Spring–IOC】【https://www.cnblogs.com/jackjavacpp/p/18829545】&#xff0c;我们了解到了IOC容器的创建过程&#xff0c;在文末也提到了AOP相关&#xff0c;但是没有作细致分析&#xff0c;这篇文章就结合示例&am…

【专业解读:Semantic Kernel(SK)】大语言模型与传统编程的桥梁

目录 Start:什么是Semantic Kernel&#xff1f; 一、Semantic Kernel的本质&#xff1a;AI时代的操作系统内核 1.1 重新定义LLM的应用边界 1.2 技术定位对比 二、SK框架的六大核心组件与技术实现 2.1 内核&#xff08;Kernel&#xff09;&#xff1a;智能任务调度中心 2…

你学会了些什么211201?--http基础知识

概念 HTTP–Hyper Text Transfer Protocol&#xff0c;超文本传输协议&#xff1b;是一种建立在TCP上的无状态连接&#xff08;短连接&#xff09;。 整个基本的工作流程是&#xff1a;客户端发送一个HTTP请求&#xff08;Request &#xff09;&#xff0c;这个请求说明了客户端…

每天学一个 Linux 命令(29):tail

​​可访问网站查看,视觉品味拉满: http://www.616vip.cn/29/index.html tail 命令用于显示文件的末尾内容,默认显示最后 10 行。它常用于实时监控日志文件或查看文件的尾部数据。以下是详细说明和示例: 命令格式 tail [选项] [文件...]常用选项 选项描述-n <NUM> …

【形式化验证基础】活跃属性Liveness Property和安全性质(Safety Property)介绍

文章目录 一、Liveness Property1、概念介绍2、形式化定义二、Safety Property1. 定义回顾2. 核心概念解析3. 为什么强调“有限前缀”4. 示例说明4.1 示例1:交通信号灯系统4.2 示例2:银行账户管理系统5. 实际应用的意义三. 总结一、Liveness Property 1、概念介绍 在系统的…

PI0 Openpi 部署(仅测试虚拟环境)

https://github.com/Physical-Intelligence/openpi/tree/main 我使用4070tisuper, 14900k,完全使用官方默认设置&#xff0c;没有出现其他问题。 目前只对examples/aloha_sim进行测试&#xff0c;使用docker进行部署, 默认使用pi0_aloha_sim模型(但是文档上没找到对应的&…

计算机视觉——利用AI幻觉检测图像是否是生成式算生成的图像

概述 俄罗斯的新研究提出了一种非常规方法&#xff0c;用于检测不真实的AI生成图像——不是通过提高大型视觉-语言模型&#xff08;LVLMs&#xff09;的准确性&#xff0c;而是故意利用它们的幻觉倾向。 这种新方法使用LVLMs提取图像的多个“原子事实”&#xff0c;然后应用自…

FlaskRestfulAPI接口的初步认识

FlaskRestfulAPI 介绍 记录学习 Flask Restful API 开发的过程 项目来源&#xff1a;【Flask Restful API教程-01.Restful API介绍】 我的代码仓库&#xff1a;https://gitee.com/giteechaozhi/flask-restful-api.git 后端API接口实现功能&#xff1a;数据库访问控制&#xf…

CSS预处理工具有哪些?分享主流产品

目前主流的CSS预处理工具包括&#xff1a;Sass、Less、Stylus、PostCSS等。其中&#xff0c;Sass是全球使用最广泛的CSS预处理工具之一&#xff0c;以强大的功能、灵活的扩展性以及完善的社区生态闻名。Sass通过增加变量、嵌套、混合宏&#xff08;mixin&#xff09;等功能&…

AI 速读 SpecReason:让思考又快又准!

在大模型推理的世界里&#xff0c;速度与精度往往难以兼得。但今天要介绍的这篇论文带来了名为SpecReason的创新系统&#xff0c;它打破常规&#xff0c;能让大模型推理既快速又准确&#xff0c;大幅提升性能。想知道它是如何做到的吗&#xff1f;快来一探究竟&#xff01; 论…

Qt通过ODBC和QPSQL两种方式连接PostgreSQL或PolarDB PostgreSQL版

一、概述 以下主要在Windows下验证连接PolarDB PostgreSQL版&#xff08;阿里云兼容 PostgreSQL的PolarDB版本&#xff09;。Linux下类似&#xff0c;ODBC方式则需要配置odbcinst.ini和odbc.ini。 二、代码 以下为完整代码&#xff0c;包含两种方式连接数据库&#xff0c;并…

MobaXterm连接Ubuntu(SSH)

1.查看Ubuntu ip 打开终端,使用指令 ifconfig 由图可知ip地址 2.MobaXterm进行SSH连接 点击session,然后点击ssh,最后输入ubuntu IP地址以及用户名