1.在maven里面配置好依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
2.创建APP类
package com.leo;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String[] args) {
String subTopic = "topic"; //主题
String pubTopic = "topic";
String content = "Hello World66+6"; //内容
int qos = 2;
String broker = "tcp://ip:1883"; //ip是自己服务器的ip
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("name"); //用户名
connOpts.setPassword("12345".toCharArray()); //密码
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
// 主循环,保持程序运行
while (true) {
try {
Thread.sleep(1000); // 每秒检查一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
static class OnMessageCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete for token: " + token);
}
}
}
3.创建回调消息处理类 OnMessageCallback
package io.emqx;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
4.结果展示
至此第一部分的功能已经完成了,以下是扩展功能
5.整合到springboot
创建一个 MqttService 类
package com.leo.service;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
@Service
public class MqttService {
public void startMqttClient() {
String subTopic = "topic"; //订阅的主题
String pubTopic = "topic";
String content = "Hello World66+6";
int qos = 2;
String broker = "tcp://ip:1883"; //你自己的ip
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("8266");
connOpts.setPassword("12345".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
// 主循环,保持程序运行
while (true) {
try {
Thread.sleep(1000); // 每秒检查一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
static class OnMessageCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived: " +
"Topic: " + topic +
", ID: " + message.getId() +
", QoS: " + message.getQos() +
", Class: " + message.getClass().getName() +
", Payload: " + new String(message.getPayload())
);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete for token: " + token);
}
}
}
在 Spring Boot 应用中调用服务
在 SprintBootDemo1Application
中调用 MqttService
的 startMqttClient
方法。为了确保主线程不会因为 Spring Boot 的启动过程而退出,可以在 startMqttClient
方法中使用一个新的线程来运行 MQTT 客户端逻辑。
package com.leo;
import com.leo.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SprintBootDemo1Application implements CommandLineRunner {
@Autowired
private MqttService mqttService;
public static void main(String[] args) {
SpringApplication.run(SprintBootDemo1Application.class, args);
}
@Override
public void run(String... args) throws Exception {
// 在新线程中启动 MQTT 客户端
new Thread(() -> mqttService.startMqttClient()).start();
}
}
就ok了