1.MQTT协议概述
MQTT是一种基于发布/订阅模式的轻量级消息传输协议,常用于低带宽、不可靠网络环境下传输消息,适用于物联网设备之间的通信。
1.1 MQTT协议的组件
- 客户端(Client):连接到MQTT代理服务器的设备(发布者、订阅者)
- 代理服务器(Broker):负责接收来自客户端的消息并将其转发给订阅该主题的客户端
- 主题(Topic):消息的分类标识,客户端通过订阅或发布到特定主题来接受或发送消息
- 消息(Message):客户端通过主题传输的数据负载
客户端通过TCP/IP连接到代理,通过身份验证保持长连接
2.MQTT服务器搭建
用EMQX来搭建服务器
快速开始 | EMQX 4.3 文档https://docs.emqx.com/zh/emqx/v4.3/getting-started/getting-started.html 下载EMQX
https://www.emqx.com/zh/try?product=brokerhttps://www.emqx.com/zh/try?product=broker 安装虚拟机(环境:centOS7)
配置 EMQX Yum 源
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
安装依赖
yum install epel-release -y
yum install -y openssl11 openssl11-devel
安装 EMQX
sudo yum install emqx -y
启动 EMQX
sudo systemctl start emqx
访问管理后台:http://localhost:18083/#/login?to=/dashboard/overview
(默认用户名:admin 密码:public)
访问接口文档: http://localhost:18083/api-docs/index.html#
2.1 创建主题
3.使用EMQX服务器
如何在 Java 中使用 MQTT | EMQ本文主要介绍如何在 Java 项目中使用 MQTT,实现 MQTT 客户端与服务器的连接、订阅和收发消息等功能。https://www.emqx.com/zh/blog/how-to-use-mqtt-in-java
3.1 导入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
3.2 创建MQTT连接
String broker = "tcp://localhost:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);
- MqttClient: 同步调用客户端,使用阻塞方法通信。
- MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息,使其能够传递到指定的 QoS。
- MqttConnectOptions: 连接选项,用于指定连接的参数,下面列举一些常见的方法。
- setUserName: 设置用户名
- setPassword: 设置密码
- setCleanSession: 设置是否清除会话
- setKeepAliveInterval: 设置心跳间隔
- setConnectionTimeout: 设置连接超时时间
- setAutomaticReconnect: 设置是否自动重连
3.3 发布MQTT消息
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class PublishSample {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
String content = "Hello MQTT";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 设置用户名和密码
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
// 连接
client.connect(options);
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(topic, message);
System.out.println("Message published");
System.out.println("topic: " + topic);
System.out.println("message content: " + content);
// 关闭连接
client.disconnect();
// 关闭客户端
client.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
3.4 订阅MQTT主题
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SubscribeSample {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
String clientid = "subscribe_client";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
// 设置回调
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
client.connect(options);
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- connectionLost(Throwable cause): 连接丢失时被调用
- messageArrived(String topic, MqttMessage message): 接收到消息时被调用
- deliveryComplete(IMqttDeliveryToken token): 消息发送完成时被调用