目录
- 1、简介
- 2、准备
- 3、使用步骤
- 3.1 引入依赖
- 3.2 创建生产者和消费者
- 3.3 测试
- 总结
- PS:
1、简介
本文章实现了一个简单的MQTT客户端
,使用Eclipse Paho库让Java和EMQX整合,测试客户端初始化时配置Broker地址、客户端ID、用户名和密码。连接成功后,订阅主题并发布消息
。
2、准备
前提是启动了EMQX服务,可以打开这个页面(目的是为了更清楚看见客户端连接和消息的发送
):
不会的可以看这篇文章:MQTT–EMQX入门+MQTTX使用
3、使用步骤
3.1 引入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
3.2 创建生产者和消费者
生产者类:
package com.itxhj.emqxdemo.io;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://192.168.176.128:1883"; // 地址修改成你开启EMQX的主机地址
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("admin"); // 客户端的账号,并非EMQX的
connOpts.setPassword("123456".toCharArray()); // 客户端的密码,并非EMQX的
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
Thread.sleep(10000); // 因为断开连接EMQX那边就看不见连接了,所以sleep一会
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
消费者类:
package com.itxhj.emqxdemo.io;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
3.3 测试
启动main方法后,看见这个表示成功
可以看见EMQX中也有连接显示
总结
总体的步骤如下:
- 引入依赖:Eclipse Paho库
- 编写生产者:配置了Broker地址、客户端ID、用户名和密码,成功连接后订阅了主题并发布了消息。
- 编写消费者:实现消息回调,处理连接丢失和接收到的消息。
- 启动项目测试:启动项目后在控制台和EMQX管理界面中查看了连接和消息的发送情况
PS:
感谢您的阅读!如果您觉得本篇文章对您有所帮助,请给予博主一个赞喔~