Java整合MQTT
上一节知道MQTT是一个通信协议,需要一个代理服务Broker;通信设备作为客户端Client,后台系统服务器也作为客户端Client。
经过了解选用EMQX作为代理服务Broker(支持WEB界面查看)
后台服务使用Spring Integration链接EMQX
1.EMQX
简介,EMQX 是一个开源的分布式物联网 MQTT 消息服务器,它实现了 MQTT 协议的各种功能,并提供了可靠的消息传递、灵活的消息路由、可扩展的集群和高可用性等特性。EMQ X 可以作为物联网应用的消息中间件,用于连接和管理大规模的物联网设备,实现设备之间的实时通信和数据传输。
语言上,EMQ X 是使用 Erlang/OTP 编程语言实现的。Erlang 是一种通用的函数式编程语言,它具有高并发、分布式和容错性的特点,非常适合构建可扩展和可靠的分布式系统。Erlang/OTP 是一个开发框架和平台,提供了一系列工具、库和标准库,用于开发并运行 Erlang 应用程序。
EMQ X 选择使用 Erlang/OTP 的主要原因是 Erlang 在处理并发和分布式通信方面具有优秀的性能和可靠性。Erlang 提供了轻量级进程(而非操作系统线程)的并发模型,每个进程具有独立的状态和消息传递机制,可以高效地处理大量的并发连接和消息处理。此外,Erlang 的可扩展性和容错性使得 EMQ X 能够在分布式环境中实现高可用性和高性能。
Erlang/OTP 还提供了许多用于网络通信、并发控制和错误处理的库和工具,这些功能对于构建物联网 MQTT 服务器非常有用。通过使用 Erlang/OTP,EMQ X 能够提供可靠的消息传递、灵活的消息路由和高可用性等关键功能,以满足物联网应用的需求
主要特点:
- MQTT 协议支持:EMQ X 是一个完全兼容 MQTT 协议的消息服务器,支持 MQTT 3.1.1 和 MQTT 5.0 版本。
- 分布式架构:EMQ X 可以在多台服务器上进行水平扩展,形成分布式集群,以处理大规模的设备连接和消息传输。
- 高可用性:EMQ X 支持主从复制和自动故障切换,以实现高可用性和容错性,保证设备和应用程序的持续可靠性。
- 灵活的消息路由:EMQ X 提供了丰富的消息路由功能,可以根据主题、内容、设备属性等灵活地进行消息过滤和转发。
- 安全性和权限控制:EMQ X 支持 SSL/TLS 加密和身份验证,可以对连接和消息进行安全保护,同时提供细粒度的权限控制,确保只有授权的设备和用户可以访问特定的主题和功能。
- 插件系统:EMQ X 提供了丰富的插件系统,可以通过插件扩展和定制功能,满足不同场景和需求的应用。
- 实时监控和统计:EMQ X 提供了实时监控和统计功能,可以监控设备连接数、消息发布和订阅情况等指标,帮助用户了解系统运行状态和性能。
- 多协议支持:除了 MQTT,EMQ X 还支持 AMQP、CoAP、WebSocket 等多种协议,以满足不同设备和应用的通信需求。
由于是测试关系,我选择下载WINDOWS版本
默认账户密码为
admin
public
2.Spring Integration
Spring Integration 是一个基于 Spring 框架的集成框架,用于构建企业级应用程序中的消息驱动和事件驱动的系统。它提供了一组丰富的组件和工具,用于实现应用程序之间的异步消息传递、事件驱动的处理和系统集成。
Spring Integration 提供了一种声明式的方式来定义和管理消息流,它基于传统的企业集成模式(Enterprise Integration Patterns,EIP),使开发者能够通过简单的配置来实现消息的路由、转换、过滤、聚合、分割等操作。开发者可以使用 Spring Integration 提供的各种消息通道、消息适配器、消息处理器和消息端点来搭建一个灵活、可扩展和可靠的消息驱动系统。
Spring Integration 支持多种消息协议和传输方式,包括 JMS、AMQP、MQTT、HTTP、TCP、UDP 等,可以与各种消息代理、消息队列和消息中间件进行集成。它还提供了与 Spring 框架的其他模块(如 Spring Boot、Spring MVC、Spring Batch 等)的无缝集成,使得构建复杂的企业应用程序变得更加简单和高效。
Spring Integration 的主要目标是帮助开发者构建可扩展和可维护的集成解决方案,通过解耦和模块化的方式来处理异步消息和事件。它广泛应用于各种领域,包括企业集成、消息驱动的微服务架构、大数据处理等。
POM引入依赖
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
配置类MqttConfiguration
/**
* mqtt配置类
*/
@Component
@Configuration
@Data
@ConfigurationProperties("mqtt")
public class MqttConfiguration {
@Autowired
private MqttCustomerClient mqttCustomerClient;
private String host;
private String clientId;
private String username;
private String password;
private String topic;
private int timeout;
private int keepAlive;
@Bean
public MqttCustomerClient getMqttCustomerClient() {
mqttCustomerClient.connect(host, clientId, username, password, timeout,keepAlive);
// 以/#结尾表示订阅所有以test开头的主题
mqttCustomerClient.subscribe("test/#");
return mqttCustomerClient;
}
}
封装客户端Bean对象MqttCustomerClient
package com.wn.mqtt.util;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* mqtt客户端
*/
@Slf4j
@Component
public class MqttCustomerClient {
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public static MqttClient getClient(){
return client;
}
public static void setClient(MqttClient client){
MqttCustomerClient.client=client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keeplive 保留数
*/
public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
MqttClient client;
try {
client=new MqttClient(host,clientID,new MemoryPersistence());
MqttConnectOptions options=new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keeplive);
MqttCustomerClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
}catch (Exception e){
e.printStackTrace();
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void pushlish(String topic,String pushMessage){
pushlish(0,false,topic,pushMessage);
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void pushlish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token=mqttTopic.publish(message);
token.waitForCompletion();
}catch (MqttPersistenceException e){
e.printStackTrace();
}catch (MqttException e){
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MqttCustomerClient.getClient().subscribe(topic,qos);
}catch (MqttException e){
e.printStackTrace();
}
}
}
消息监听类PushCallback
/**
* 消费监听
*/
@Component
public class PushCallback implements MqttCallback {
private static MqttClient client;
@Override
public void connectionLost(Throwable throwable) {
if (client == null || !client.isConnected()) {
System.out.println("连接断开,正在重连....");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
测试类
@Autowired
private MqttCustomerClient mqttCustomerClient;
@Test
void pushlish() {
for (int i = 0; i < 10; i++) {
mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);
try {
Thread.*sleep*(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
EMQX WEB监控界面接收消息数据