目录
1.首先安装emqx,去官网下载emqx压缩包,并且解压。
2.使用emqx start 命令启动emqx后台管理
3.下载mqttx调试工具,使用mqttx调试mqtt连接。下载地址:MQTTX下载-MQTTX官方版下载,下载完成直接打开,便可进行mqtt连接调试
4.登录emqx后台,我们需要对emqx进行系统主题的订阅,给客户端设置允许系统主题的订阅,这样就能及时监听到连接设备的消息了。
5.关键代码
6.运行效果如下:
7.这次的上下线监听对比之前的方案效果可能更加,欢迎大家多多提建议,以及指正。
1.首先安装emqx,去官网下载emqx压缩包,并且解压。
2.使用emqx start 命令启动emqx后台管理
3.下载mqttx调试工具,使用mqttx调试mqtt连接。下载地址:MQTTX下载-MQTTX官方版下载,下载完成直接打开,便可进行mqtt连接调试
4.登录emqx后台,我们需要对emqx进行系统主题的订阅,给客户端设置允许系统主题的订阅,这样就能及时监听到连接设备的消息了。
5.关键代码
package com.mqtt.demo.listener;
import ch.qos.logback.core.net.server.Client;
import com.alibaba.fastjson.JSONObject;
import com.mqtt.demo.entity.QosEnum;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttOnOffListener {
private Logger logger = LoggerFactory.getLogger(MqttOnOffListener.class);
private static final String onOffLineTopic = "$SYS/brokers/+/clients/#";
private MqttClient mqttClient;
public MqttOnOffListener(MqttClient mqttClient){
this.mqttClient =mqttClient;
}
public void initListener() throws MqttException {
IMqttMessageListener listener = new IMqttMessageListener() {
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
logger.info("监听到的消息:{}", mqttMessage.toString());
JSONObject jsonObject = JSONObject.parseObject(mqttMessage.toString());
String clientId = jsonObject.getString("clientid");
logger.info("clientId:{}",clientId);
String connected_at = jsonObject.getString("connected_at");
String disconnected_at = jsonObject.getString("disconnected_at");
if(null!=connected_at && null!=disconnected_at){
logger.info("设备:{}已离线!",clientId);
}
if (null!=connected_at && null ==disconnected_at){
logger.info("设备:{}上线啦!",clientId);
}
}
};
mqttClient.subscribe(onOffLineTopic, QosEnum.QOS2.getType(),listener);
}
}