Java 使用 EMQX 实现物联网 MQTT 通信

news2025/2/1 1:55:23

一、介绍

1、MQTT

MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

特点:
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
对负载内容屏蔽的消息传输;
使用 TCP/IP 提供网络连接;
有三种消息发布服务质量:
小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

2、EMQX

EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。
EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行
在这里插入图片描述

优势:
海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。
高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。
数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。
多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。
高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。
易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。

3、Mria 集群架构​

支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。

在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。

具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html

4、MQTTX

MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。

在这里插入图片描述

主要功能
采用聊天界面设计,使得操作更加简单明了
跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
订阅的 MQTT 主题支持自定义颜色标签
支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
支持通过 WebSocket 连接 MQTT 服务器
支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
自定义脚本支持模拟 MQTT 发布/订阅测试
提供完整的日志记录功能
多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ??? ??? ??? ??? ???
自由切换 Light、Dark、Night 三种主题模式

二、实战

1、引入maven依赖:

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>
# EMQX配置
emqx:
  # EMQX服务地址,端口号默认18083
  url: http://127.0.0.1:18083
  # 认证用户名
  username: admin
  # 密码
  password: admin123456
/**
 * EMQX 登录账号密码配置
 */

@Data
@Configuration
@ConfigurationProperties(prefix = "emqx")
public class EmqxConfig {

	private String url;

	private String username;

	private String password;
}
spring:
  # MQTT配置
  mqtt:
    # MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
    host-url: tcp://127.0.0.1:1883
    # 用户名
    username: admin
    # 密码
    password: admin123456
    # 客户端id(不能重复)
    client-id: real-mqtt-client
    # MQTT默认的消息推送主题,实际可在调用接口时指定
    default-topic: topic
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
public class MqttConfig {

	private String username;

	private String password;

	private String hostUrl;

	private String clientId;

	private String defaultTopic;
}

MQTT客户端连接工厂类

@Slf4j
@Component
public class MqttFactory {

	public static ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();

	@Autowired
	private MqttConfig mqttConfig;

	@Autowired
	private RealPersonAccessDeviceMapper realPersonAccessDeviceMapper;

	/**
	 * 在bean初始化后连接到服务器
	 */
	@PostConstruct
	public void init() {
		String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG);
		if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) {
			// 初始化订阅主题
			initSubscribeTopic(getInstance());
		}
	}

	/**
	 * 初始化客户端
	 */
	public MqttClient getInstance() {
		MqttClient client = null;
		if (clientMap.get(mqttConfig.getClientId()) == null) {
			try {
				client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId());
				// MQTT配置对象
				MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
				// 设置自动重连, 其它具体参数可以查看MqttConnectOptions
				mqttConnectOptions.setAutomaticReconnect(true);
				// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
				// mqttConnectOptions.setCleanSession(true);
				// 设置超时时间 单位为秒
				mqttConnectOptions.setConnectionTimeout(10);
				mqttConnectOptions.setUserName(mqttConfig.getUsername());
				mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
				// mqttConnectOptions.setServerURIs(new String[]{url});
				// 设置会话心跳时间 单位为秒
				mqttConnectOptions.setKeepAliveInterval(10);
				// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
				// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);
				if (!client.isConnected()) {
					client.connect(mqttConnectOptions);
				}
				client.setCallback(new MqttCallBack());
				log.info("MQTT创建client成功={}", JSONObject.toJSONString(client));
				clientMap.put(mqttConfig.getClientId(), client);
			} catch (MqttException e) {
				log.error("MQTT连接消息服务器[{}]失败", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl());
			}
		} else {
			client = clientMap.get(mqttConfig.getClientId());
			log.info("MQTT从map里获取到client,clientId=" + mqttConfig.getClientId());
			// TODO 已采用自动重连策略
//			log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client));
//			if (!client.isConnected()) {
//				initSubscribeTopic(client);
			// 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接
//				clientMap.remove(mqttConfig.getClientId());
//				this.getInstance();
//			}
		}
		return client;
	}


	/**
	 * 初始化订阅主题
	 * <p>
	 * 消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
	 */
	public void initSubscribeTopic(MqttClient client) {
		// 查询所有宇泛mqtt设备并订阅主题
		List<RealPersonAccessDevice> deviceList = realPersonAccessDeviceMapper.selectList(Wrappers.<RealPersonAccessDevice>lambdaQuery()
				.eq(RealPersonAccessDevice::getDeviceProducerType, RealCommonConstants.DeviceProducerType.UNIUBI.getValue())
				.and(wrapper -> wrapper
						.eq(RealPersonAccessDevice::getProtocolType, RealCommonConstants.ProtocolType.MQTT.getValue())
						.or()
						.eq(RealPersonAccessDevice::getProtocolType, RealCommonConstants.ProtocolType.UMQTT.getValue()))
				.eq(RealPersonAccessDevice::getStatus, CommonConstants.STATUS_ENABLE));
		if (CollectionUtil.isNotEmpty(deviceList)) {
			// 订阅设备发布消息主题
			List<String> upstreamTopics = new ArrayList<>();
			List<Integer> upstreamQos = new ArrayList<>();
			for (RealPersonAccessDevice device : deviceList) {
				if (StrUtil.equals(device.getProtocolType(), RealCommonConstants.ProtocolType.MQTT.getValue())) {
					upstreamTopics.add(String.format(MqttCommonConstants.DEFAULT, device.getDeviceNo()));
					upstreamQos.add(1);
					upstreamTopics.add(String.format(MqttCommonConstants.UPSTREAM, device.getDeviceNo()));
					upstreamQos.add(0);
					upstreamTopics.add(String.format(MqttCommonConstants.WILL, device.getDeviceNo()));
					upstreamQos.add(1);
				} else if (StrUtil.equals(device.getProtocolType(), RealCommonConstants.ProtocolType.UMQTT.getValue())) {
					upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, device.getDeviceNo()));
					upstreamQos.add(1);
					upstreamTopics.add(String.format(UMqttCommonConstants.ONLINE));
					upstreamQos.add(1);
					upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, device.getDeviceNo()));
					upstreamQos.add(2);
					upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, device.getDeviceNo()));
					upstreamQos.add(0);
				}
			}
			try {
				client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray());
			} catch (MqttException e) {
				e.printStackTrace();
			}
		}
	}

}

业务方法

@Slf4j
@Data
@Configuration
public class UMqttClientService {

	private final MqttFactory mqttFactory;

	private final StringRedisTemplate redisTemplate;

	/**
	 * 订阅主题
	 */
	public void subscribeTopic(String deviceNo) {
		try {
			// 订阅设备发布消息主题
			List<String> upstreamTopics = new ArrayList<>();
			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
			upstreamTopics.add(UMqttCommonConstants.ONLINE);
			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
			int[] upstreamQos = {1, 1, 2, 0};
			mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 取消订阅主题
	 */
	public void stopSubscribeTopic(String deviceNo) {
		try {
			// 取消订阅设备发布消息主题
			List<String> upstreamTopics = new ArrayList<>();
			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
			upstreamTopics.add(UMqttCommonConstants.ONLINE);
			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
			mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0]));
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 断开连接
	 */
	public void disConnect() {
		try {
			mqttFactory.getInstance().disconnect();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 订阅主题
	 */
	public void subscribe(String topic, int qos) {
		try {
			mqttFactory.getInstance().subscribe(topic, qos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 发布请求设备消息
	 *
	 * @param deviceNo 设备编号
	 * @param message  消息
	 */
	public void publish(String deviceNo, String message) {
		publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message);
	}

	/**
	 * 发布请求设备消息
	 */
	public void publish(UMqttPublishDate publishDate) {
		// 将消息id和方法名存到redis中:缓存3分钟
		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
		publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage());
	}

	/**
	 * 发布响应设备消息
	 */
	public void publishResponse(UMqttPublishDate publishDate) {
		// 将消息id和方法名存到redis中:缓存3分钟
		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
		publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage());
	}

	/**
	 * 发布消息
	 *
	 * @param qos      qos
	 * @param retained retained
	 * @param topic    主题
	 * @param message  消息
	 */
	public void publish(int qos, boolean retained, String topic, String message) {
		log.info("发布消息topic:" + topic);
		log.info("发布消息message:" + message);
		MqttMessage mqttMessage = new MqttMessage();
		mqttMessage.setQos(qos);
		mqttMessage.setRetained(retained);
		mqttMessage.setPayload(message.getBytes());
		//主题的目的地,用于发布/订阅信息
		MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic);
		//提供一种机制来跟踪消息的传递进度
		//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
		MqttDeliveryToken token;
		try {
			//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
			//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
			token = mqttTopic.publish(mqttMessage);
			token.waitForCompletion();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/999957.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Netty核心原理:一、基础入门-01:初入JavaIO之门BIO、NIO、AIO实战练习

文章目录 一、前言介绍1.1 BIO&#xff1a;同步阻塞I/O模式1.2 NIO&#xff1a;同步非阻塞I/O模式1.3 AIO&#xff1a;异步非阻塞I/O模式 二、代码实现2.1 工程结构2.2 BIO&#xff1a;同步阻塞I/O实现2.2.1 BIO处理器2.2.2 BIO适配器2.2.3 BIO客户端处理器2.2.4 BIO客户端2.2.…

计算机网络第五章——传输层(上)

早知如此绊人心&#xff0c;何如当初莫相识 文章目录 前言 前言 虽然说是手机和手机之间的通信但是其实是手机之间的进程和进程之间的通信&#xff0c;所以这一章主要是研究进程之间通信的问题&#xff0c;在计算机网络中有一个重要的问题&#xff0c;在进行数据通信和资源共享…

【分享】golang windows 运行报错 undefined: syscall.SIGUSR1

在跟着煎鱼大佬学习 Golang-gin的时候&#xff0c;"在优雅的重启服务篇" ,为了gin服务的热更新&#xff0c;采用了 endlessfresh的方案&#xff0c;安装endless后无法在windows本地调试,然后报错。 (优雅的重启服务-地鼠文档优雅的重启服务-我不怎么喜欢左写写&#…

linux内核模块编译方法之模块编程详解

文章目录 一、模块传参二、模块依赖三、内核空间和用户空间四、执行流五、模块编程与应用编程的比较六、内核接口头文件查询总结 本期和大家主要分享的是驱动开发内核编译过程中对于模块是如何设计的&#xff0c;进行了详细的分享&#xff0c;从模块传参、模块依赖一直到内核空…

楼顶空地适合建造气膜体育馆吗?

众所周知&#xff0c;传统建筑的荷载太大&#xff0c;出于安全考虑&#xff0c;是不适合继续在楼顶加盖传统结构体育馆的&#xff0c;但是&#xff0c;气膜体育馆作为一种装配式建筑&#xff0c;它是可以在城市高空上建造一个轻盈又新颖独特的全天候气膜馆。 气膜体育馆作为一种…

小黑自己在家尝试涮牛排,肚子又开始了新一轮的胀气,喝到了酱香拿铁并烫了纹理发型体验一把的leetcode之旅:123. 买卖股票的最佳时机 III

动态规划1 class Solution:def maxProfit(self, prices: List[int]) -> int:# 数组长度n len(prices)if n < 2:return 0# 动态规划变量# 第一次买的价格first_price prices[0]# 第一次卖的收益first_cell 0# 第二次买的价格second_price prices[0]# 第二次卖second_…

STM32H750 HAL CUBEMX 时钟失败及死机无法下载问题解决

芯片采样电压设置&#xff0c;否则 无法运行 解决死机问题 设置swd 模式 短接 boot0 —vcc 3.3v即可正常下载

驱动开发,stm32mp157a开发板的led灯控制实验

1.实验目的 编写LED灯的驱动&#xff0c;在应用程序中编写控制LED灯亮灭的代码逻辑实现LED灯功能的控制&#xff1b; 2.LED灯相关寄存器分析 LED1->PE10 LED1亮灭&#xff1a; RCC寄存器[4]->1 0X50000A28 GPIOE_MODER[21:20]->01 (输出) 0X50006000 GPIOE_ODR[10]-&g…

5. HBase必知必会之理论进阶篇

HBase必知必会之理论进阶篇 1.1 集群搭建以及规模预测1.1.1 HBase集群搭建1.1.2 HBase集群规划 1.2 HBase重要的概念1.2.1 snapshot1.2.2 region 切分1.2.3 RIT1.2.4 HBase读优化1.2.4.1 客户端优化1.2.4.2 服务端优化1.2.4.3 hdfs 优化 1.2.5 HBase写优化1.2.5.1 客户端优化1.…

Linux centos7 bash编程训练__打印各类形状

利用for循环&#xff0c;打印各种不同的三角形、矩形和菱形。 主要是fort循环嵌套使用&#xff0c;及条件判断等。 因方法简单&#xff0c;不作更多解释&#xff0c;部分注释可以帮助初学者掌握代码。 下面列出代码&#xff0c;供参考。 #! /bin/bash ## 打印输出各种*型形…

中企出海,用火山引擎DataTester开启增长第一步

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 今年 Google 宣布其提供的A/B测试工具 Optimize 将在2023年9月30号停止服务。在全球化浪潮席卷下&#xff0c;越来越多的中国企业正在加速走向全球市场&#xff0c;…

使用 Webpack 从 0 到 1 构建 Vue3 项目 + ts

使用 Webpack 从 0 到 1 构建 Vue3 项目 1.初始化项目结构2.安装 webpack&#xff0c;补充智能提示3.初步编写 webpack.config.js3.1设置入口文件及出口文件3.2 指定 html 模板位置 4.配置 运行/打包 命令&#xff0c;首次打包项目5.添加 Vue 及相关配置5.1安装并引入 vue5.2 补…

一个详细且完整的公司局域网搭建案例,跟着操作!

局域网(Local Area Network&#xff0c;简称LAN)&#xff0c;用于将有限范围内&#xff08;例如一个实验室、一层办公楼或者校园&#xff09;的各种计算机、终端与外部设备互联成网。公司局域网怎么建立&#xff1f;首先来了解下不同规模企业网络组建方式。 10人以下企业网络组…

固定资产管理表怎么填写

在现代企业管理中&#xff0c;固定资产的管理是至关重要的环节。它不仅关系到企业运营的效率&#xff0c;也直接影响到企业的财务状况。因此&#xff0c;正确、有效地填写和管理固定资产管理表显得尤为重要。并提供一些创新的方法来优化这一过程。  让我们理解什么是固定资产…

Win10 cmd默认使用管理员身份运行的修改

一、在开始菜单搜索cmd&#xff0c;打开快捷方式文件位置 二、鼠标右键快捷方式&#xff0c;打开属性 三、选择高级&#xff0c;再勾选用管理员身份运行&#xff0c;点击确定即可

文心一言插件开发全流程,ERNIE-Bot-SDK可以调用文心一言的能力

文心一言插件开发 前言插件插件是什么工作原理申请开发权限 开始第一步&#xff1a;安装python第二步&#xff1a;搭建项目manifest 描述文件&#xff1a;ai-plugin.json插件服务描述文件&#xff1a;openapi.yaml开发自己的plugin-server 第三步&#xff1a;上传插件 SDK相关链…

记录一次开机内存分析的全过程

作者&#xff1a;zzy的学习笔记 记录一次开机内存分析的全过程&#xff0c;尽量详尽的介绍常用内存分析工具和命令行的使用&#xff0c;结合具体问题探讨开机内存分析的实践经验。通过这篇文章我会介绍开机内存的常用测试分析工具的基本使用方法&#xff0c;以及如何通过抓取出…

在UMG中播放图像序列,出现卡帧怎么办?

在虚幻引擎中播放图像序列 前期步骤可以参考上面链接中官方文档的步骤1-13 如果在媒体播放器中播放的时候&#xff0c;出现卡帧现象&#xff0c;说明你的图片序列的帧率与默认的不匹配 需要在lmg Media Source类型文件中&#xff0c;覆写你的帧率 比如&#xff0c;我的图片序…

VSCode错误整理

文章目录 一、zsh: command not found: python二、Python pip安装Django异常Could not find a version that satisfies the requirement pytz (from django)三、WARNING: You are using pip version 21.2.4, however version 23.2.1 is available.四、pip install django下载报…

Sui参会必备|Token 2049活动一览

TOKEN2049是在新加坡举办的一年一度首屈一指地加密货币活动&#xff0c;吸引了顶级的Web3公司和项目的创始人和高管&#xff0c;他们将在这里分享行业观点、聚焦全球发展&#xff0c;同时以独特且广泛的视角审视这个生态系统及其广阔的机会。 自5月份主网上线以来&#xff0c;S…