文章目录
- 1 MQTT协议与EMQ中间件
- 1.1 物联网消息协议MQTT
- 1.1.1 什么是MQTT
- 1.1.2 MQTT相关概念
- 1.1.3 消息服务质量QoS——信息的可靠投递
- 1.1.3.1 QoS0——消息服务质量为0,消息发送至多一次
- 1.1.3.2 QoS1——消息发送至少一次
- 1.1.3.3 QoS2——消息发送仅一次
- 1.1.3.4 不同情况下客户端收到的消息QoS
- 1.1.4 topic通配符匹配规则
- 1.1.4.1 层级分隔符:/
- 1.1.4.2 多层通配符:#
- 1.1.4.3 单层通配符:+
- 1.1.5 MQTT优点
- 1.2 物联网消息中间件EMQX
- 1.2.1 什么是EMQX
- 1.2.2 EMQ环境安装——基于Docker
- 1.2.3 EMQ客户端
- 1.2.3.1 EMQDashboard
- 1.2.3.2 EMQTTX
- 1.2.4 延迟消息
- 1.2.5 共享订阅
- 1.2.5.1 不带群组的共享订阅
- 1.2.5.2 带群组的共享订阅
- 1.3 Eclipse Paho
- 1.3.1 Eclipse Paho是什么
- 1.3.2 Eclipse Paho快速入门
- 1.3.2.1 集成Eclipse paho
- 1.3.2.2 发布消息到EMQ
- 1.3.2.3订阅消息
1 MQTT协议与EMQ中间件
1.1 物联网消息协议MQTT
1.1.1 什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议):其时基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。
客户端 | 服务端(消息代理Broker) |
---|---|
发布其他客户端可能会订阅的消息 | 接收来自客户端的网络连接 |
订阅其他客户端发布的消息 | 接收客户点发布的应用消息 |
退订或删除应用程序的消息 | 处理来自客户端的订阅和退订请求 |
断开与服务器的连接 | 向订阅的客户转发应用和程序消息 |
1.1.2 MQTT相关概念
MQTT常用方法 | 解释 |
---|---|
CONNECT | 客户端连接到服务器 |
CONNACK | 连接确认 |
PUBLISH | 发布消息 |
PUBACK | 发布确认 |
PUBREC | 发布的消息已接收 |
PUBREL | 发布的消息已释放 |
PUBCOMP | 发布完成 |
SUBSCRIBE | 订阅请求 |
SUBACK | 订阅确认 |
UNSUBSCRIBE | 取消订阅 |
UNSUBACK | 取消订阅确认 |
PINGREQ | 客户端发送心跳 |
PINGRESP | 服务端心跳响应 |
DISCONNECT | 断开连接 |
AUTH | 认证 |
1.1.3 消息服务质量QoS——信息的可靠投递
MQTT协议中规定了消息服务质量(QoS),其保证了在不同网络环境下信息传递的可靠性。MQTT设计了QoS0、QoS1、QoS2三个QoS等级。
QoS等级 | 说明 | 注意 |
---|---|---|
QoS0 | 消息最多 传递一次 | 消息发布完全依赖底层TCP/IP网络,会发生消息丢失,消息不会被接收端应答,也不会被发送者存储再发送,称之为“即发即弃 ” |
QoS1 | 消息至少 传递一次 | |
QoS2 | 消息仅 传递一次 |
1.1.3.1 QoS0——消息服务质量为0,消息发送至多一次
消息服务质量为0(QoS0):
消息发送至多一次
,消息发布完全依赖底层TCP/IP网络,会发生消息丢失
,消息不会被接收端应答,也不会被发送者存储再发送,称之为“即发即弃
”
1.1.3.2 QoS1——消息发送至少一次
消息服务质量为1(QoS1):
消息发送至少一次
,确保消息送达
,但可能发生消息重复投递
,发送者会存储消息直到接收者反馈回Puback(发布确认)格式的应答确认。
1.1.3.3 QoS2——消息发送仅一次
消息服务质量为2(QoS2):
消息发送仅一次
,确保消息到达一次
,他将相应的处理Publish(发布)消息,并通过Pubrec(发布收到)向发送方确认。
1.1.3.4 不同情况下客户端收到的消息QoS
发布消息的QoS | 主题订阅的QoS | 接收消息的QoS |
---|---|---|
0 | 0 | 0 |
0 | 1 | 0 |
0 | 2 | 0 |
1 | 2 | 1 |
2 | 1 | 1 |
2 | 2 | 2 |
总结由上表得出:若,发布的消息QoS为m,主题订阅的消息QoS为m,接收消息的QoS为h时:
- ① 若m=0或n=0,则h=0;
- ② 若0<m< n ,则h=n;
- ③ 若0<n<m,则h=n。
1.1.4 topic通配符匹配规则
1.1.4.1 层级分隔符:/
层级分隔符 / :用来分割主题树的每一层,为主题(Topic)空间提供分等级结构,适用于:当两个通配符在一个主题中出现的时候。
例如:love/you/with/all/my
1.1.4.2 多层通配符:#
多层通配符 # :多层通配符表示≥0的层次。因此,love/#也可以匹配到单独的
love
,此时#代表love
后边没有的0层。
注意:多层通配符必须是主题树最后一个字符,例如:love/#可以匹配到love,但是love/#/with是无效的。
1.1.4.3 单层通配符:+
单层通配符 + :只匹配一层,例如:love/you/#匹配love/you/with,但是不能匹配love/you/with/all,他只额能通配一个字符。
1.1.5 MQTT优点
① 精简,不添加冗余功能;② 发布/订阅模式,方便消息在传感器间传递,客户端与服务端完成解耦;③ 动态创建主题(不需要预先创建主题),零运维成本。④支持连续的会话保持和控制(心跳检测);⑤ 提供服务质量(quality of service level:QoS)管理;⑥ 不强求传输数据格式与类型。
1.2 物联网消息中间件EMQX
1.2.1 什么是EMQX
EMQ X Broker 是基于高并发的Erlang/OTP语言平台开发,支持百万级连接与分布式集群架构,基于MQTT协议的消息服务器。
参考EMQX官网
EMQX特点:
- ① 基于MQTT协议实现得开源消息中间件,②支持桥接和共享订阅,③中国本地技术支持服务。
1.2.2 EMQ环境安装——基于Docker
docker pull emqx/emqx:v4.1.0
docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
1.2.3 EMQ客户端
1.2.3.1 EMQDashboard
MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端
1.2.3.2 EMQTTX
EMQ X 提供 Dashboard客户端方便用户管理设备与监控相关指标。
默认用户名是 admin,密码是 public
1.2.4 延迟消息
步骤:
- ①用户需开启模块的emqx_mod_delayed
- ②主题格式:
$delayed/{DelayInterval}/{TopicName}
注意:
① 使用 $delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。
② {DelayInterval}: 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。
③ {TopicName}: MQTT 消息的主题名称。
1.2.5 共享订阅
注意:分组发送是每一个组里选一个,因此十分适合微服务集群。
1.2.5.1 不带群组的共享订阅
格式:
$queue/{TopicName}
EMQ X的共享订阅支持均衡策略配置(默认Random): etc/emqx.con
1.2.5.2 带群组的共享订阅
$share/<group-name>/{TopicName}
1.3 Eclipse Paho
1.3.1 Eclipse Paho是什么
Eclipse paho 实现mqtt协议java客户端。类似于Mysql于JDBC。
1.3.2 Eclipse Paho快速入门
1.3.2.1 集成Eclipse paho
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
1.3.2.2 发布消息到EMQ
@GetMapping("/publish")
public void publish() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
//连接选项中定义用户名密码和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
options.setAutomaticReconnect(true);//是否自动重连
options.setConnectionTimeout(30);//连接超时时间 秒
options.setKeepAliveInterval(10);//连接保持检查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
client.connect(options);//连接
client.publish("topic", "发送内容".getBytes(), 2, false);
}
1.3.2.3订阅消息
@GetMapping("/subscribe")
public void subscribe() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
//连接选项中定义用户名密码和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
options.setAutomaticReconnect(true);//是否自动重连
options.setConnectionTimeout(30);//连接超时时间 秒
options.setKeepAliveInterval(10);//连接保持检查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接丢失!");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println( "接收到消息 topic:" +s+" id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void connectComplete(boolean b, String s) {
System.out.println("连接成功!");
}
});
client.connect(options);//连接
client.subscribe("test"); //订阅主题
}