一. MQTT 与 EMQX
MQTT 是轻量级基于代理的发布/订阅的消息传输协议。使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。底层使用 TCP/IP 提供网络连接。
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器,支持百万级连接和分布式集群架构。
二. 搭建MQTT服务器
在 Windows 上搭建MQTT服务器,下载 EMQX Broker 压缩包,将其解压即可。
进入压缩包的bin目录下,执行:
emqx start
则 emqx 服务器在本地启动。使用下面语句可以查看启动状态:
emqx_ctl status
打开浏览器 输入http://localhost:18083/ 进入管理界面(EMQ提供了web的控制后台,默认是18083端口)。默认用户名 admin,默认密码 public。可以看到EMQ的Dashboard界面:
三. Java 实现订阅发布
在发布订阅中,无所谓服务端和客户端,两者都是连接至 mqtt 服务器的客户端(可以查看 dashboard 的连接数)。两者都既可以发布主题,也可以订阅主题。
订阅端:
public class MqttClientTest {
//订阅的主题
public static final String TOPIC = "mqtt";public static void main(String[] args) {
MyClient myClient = new MyClient();
myClient.subscribe(TOPIC, 1);
}
}/**
* 订阅方
*/
class MyClient {
//mqtt服务器默认的地址和端口号
public static final String HOST = "tcp://localhost:1883";//连接MQTT的客户端ID,一般以唯一标识符表示
private static final String CLIENTID = "client";
//连接的用户名密码(非必需)
private String userName = "admin";
private String password = "password";private MqttClient mqttClient;
public MyClient() {
try {
mqttClient = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(60);
options.setKeepAliveInterval(10);
options.setUserName(userName);
options.setPassword(password.toCharArray());
//定义回调函数
mqttClient.setCallback(new PushCallBack());
mqttClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}//订阅主题
public void subscribe(String topic, int qos) {
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
发布端:public class MqttServerTest {
//发布的主题
public static final String TOPIC = "mqtt";public static void main(String[] args) throws InterruptedException {
MqttServer mqttServer = new MqttServer();
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setPayload("第一次广播".getBytes());
mqttServer.publish(TOPIC, message);
Thread.sleep(1000);
message.setPayload("第二次广播".getBytes());
mqttServer.publish(TOPIC, message);
Thread.sleep(1000);
message.setPayload("第三次广播".getBytes());
mqttServer.publish(TOPIC, message);
}
}/**
* 发布方
*/
class MqttServer {
//mqtt服务器默认的地址和端口号
private static final String HOST = "tcp://localhost:1883";
//连接MQTT的客户端ID,一般以唯一标识符表示
private static final String CLIENTID = "server";
//连接的用户名密码(非必需)
private String userName = "admin";
private String password = "password";private MqttClient mqttClient;
//构造方法,启动mqttClient
public MqttServer() {
try {
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
mqttClient = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(60);
options.setKeepAliveInterval(10);
options.setUserName(userName);
options.setPassword(password.toCharArray());
//定义回调函数
mqttClient.setCallback(new PushCallBack());
mqttClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}//发布主题
public void publish(String topic, MqttMessage message) {
try {
mqttClient.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
回调函数,发布方订阅方在发布订阅完消息后的回调:
public class PushCallBack implements MqttCallback {
/**
* mqtt连接丢失时触发(不包括主动disconnect)
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接失败,可做重连");
}/**
* 收到订阅消息后调用
* @param s
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("---------------------------");
System.out.println("接收到的主题为:" + s);
System.out.println("接收到的消息为:" + new String(mqttMessage.getPayload()));
}/**
* 发布消息完成后调用
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("---------------------------");
System.out.println("广播完成");
}
}
分别启动两个程序,结果展示如下:
四. 实现连接丢失后重新连接
1. 自定义重连方式
当连接突然断开时,可在 connectionLost() 回调方法中设置重连恢复连接。修改 PushCallBack 类中的 connectionLost() 方法:
public class PushCallBack implements MqttCallback {
private MqttClient mqttClient;
private MqttConnectOptions options;public PushCallBack(MqttClient mqttClient, MqttConnectOptions options) {
this.mqttClient = mqttClient;
this.options = options;
}/**
* mqtt连接丢失时触发(不包括主动disconnect)
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
while(true) {
try {
mqttClient.connect(options);
if (mqttClient.isConnected()) {
break;
}
} catch (MqttException e) {
e.printStackTrace();
}
}
//验证重连
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setPayload("重连后的广播".getBytes());
try {
mqttClient.publish("mqtt", message);
} catch (MqttException e) {
e.printStackTrace();
}}
}
发布端的代码修改为:
//定义回调函数
mqttClient.setCallback(new PushCallBack(mqttClient, options));
注意,主动 disconnect 是不会触发 connectionLost 方法的,我们可以进入到 dashboard 中剔除发布端的连接进行测试:
可以看到,踢出后server又立即连上了:
订阅方也收到了重连后的广播:
此外要注意的是,重连后不会自动订阅之前已订阅的主题,如果有订阅主题需要手动再次订阅。
2. 自动重连
MqttConnectOptions 有个方法 setAutomaticReconnect,若将其设置为 true,则可实现在连接丢失后自动重连:
options.setAutomaticReconnect(true);
此时则无需在 connectionLost 方法里自己实现重连。
此外,实验中发现,如果 connectionLost 中写了类似于 while(true) 的死循环,automaticReconnect 自动重连不会生效。猜想是因为连接丢失后,mqttClient 首先触发 connectionLost 方法,并且进入循环出不来了,而自动重连也需要当前的 mqttClient 对象调用自动重连方法,所以将无法触发自动重连机制。
五. MqttCallbackExtended
MqttCallback 的升级版,新增了一个 connectComplete 方法,在 mqttClient 连接成功时触发。
因此我们自定义的 PushCallBack方法还可以继承自 MqttCallbackExtended,此时则可以重写 connectComplete 方法,自定义重连成功后的响应事件(一般可在此实现重连后的重新订阅)。
@Override
public void connectComplete(boolean b, String s) {
System.out.println("重连成功");
}