目录
1、关于header问题
2、MQTT 连接参数的使用
2.1连接地址
2.2 基于 TCP 的 MQTT 连接
2.3 基于 WebSocket 的连接
3、订阅topic
4、推送消息给订阅者
5、QOS 机制
5.1 QOS是什么
5.2 QOS的实现原理
5.3 发送流程
6、reatain机制
总结:给还没上线的人留言
7、遗嘱消息
总结:给还在等消息的人留言
8、Clean Session
9、cleint id
如果客户端使用一个重复的 Client ID 连接至服务器,将会把已使用该 Client ID 连接成功的客户端踢下线。
10、连接超时(Connect Timeout)
11、总结
1、关于header问题
mqtt的一直说header比较节省流量,这是为什么呐、?看下结构图
可以看到有不同的header结构,字段也很少,确实节省流量
2、MQTT 连接参数的使用
2.1连接地址
MQTT 的连接地址通常包含 :服务器 IP 或者域名、服务器端口、连接协议。
2.2 基于 TCP 的 MQTT 连接
mqtt 是普通的 TCP 连接,端口一般为 1883。
mqtts 是基于 TLS/SSL 的安全连接,端口一般为 8883。
比如 mqtt://broker.emqx.io:1883 是一个基于普通 TCP 的 MQTT 连接地址。
2.3 基于 WebSocket 的连接
ws 是普通的 WebSocket 连接,端口一般为 8083。
wss 是基于 WebSocket 的安全连接,端口一般为 8084。
当使用 WebSocket 连接时,连接地址还需要包含 Path,EMQX 默认配置的 Path 是 /mqtt。比如 ws://broker.emqx.io:8083/mqtt 是一个基于 WebSocket 的 MQTT 连接地址。
3、订阅topic
客户端订阅topic之后,服务器是如何保存,并且如何转发的。
可以看到服务端订阅之后会放入一个set,在做转发的时候动态匹配,匹配成功之后才会进行转发。
这里也是用了线程池
com.lxr.iot.bootstrap.channel.MqttHandlerService#subscribe
/**
* 订阅
*/
@Override
public void subscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription ->
mqttTopicSubscription.topicName()
).collect(Collectors.toSet());
mqttChannelService.suscribeSuccess(mqttChannelService.getDeviceId(channel), topics);
subBack(channel, mqttSubscribeMessage, topics.size());
}
/**
* 订阅成功后 (发送保留消息)
*/
public void suscribeSuccess(String deviceId, Set<String> topics){
doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {
MqttChannel mqttChannel = mqttChannels.get(deviceId);
mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识
mqttChannel.addTopic(strings);
executorService.execute(() -> {
Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
if(mqttChannel1.isLogin()){
strings.parallelStream().forEach(topic -> {
addChannel(topic,mqttChannel);
sendRetain(topic,mqttChannel); // 发送保留消息
});
}
});
});
});
}
4、推送消息给订阅者
遍历所有的channel,根据不同的QOS进行转发。
这里的channel也做了使用一个map进行保存
protected static Cache<String, Collection<MqttChannel>> mqttChannelCache = CacheBuilder.newBuilder().maximumSize(100).build();
com.lxr.iot.bootstrap.channel.MqttChannelService#push
/**
* 推送消息给订阅者
*/
private void push(String topic, MqttQoS qos, byte[] bytes, boolean isRetain){
Collection<MqttChannel> subChannels = getChannels(topic, topic1 -> cacheMap.getData(getTopic(topic1)));
if(!CollectionUtils.isEmpty(subChannels)){
subChannels.parallelStream().forEach(subChannel -> {
switch (subChannel.getSessionStatus()){
case OPEN: // 在线
if(subChannel.isActive()){ // 防止channel失效 但是离线状态没更改
switch (qos){
case AT_LEAST_ONCE:
sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,subChannel,topic,bytes);
break;
case AT_MOST_ONCE:
sendQos0Msg(subChannel.getChannel(),topic,bytes);
break;
case EXACTLY_ONCE:
sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,subChannel,topic,bytes);
break;
}
}
else{
if(!subChannel.isCleanSession() & !isRetain){
clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
break;
}
}
break;
case CLOSE: // 连接 设置了 clean session =false
clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
break;
}
});
}
}
5、QOS 机制
5.1 QOS是什么
可靠的消息传递
MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。
- QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
- QoS 1:消息至少传送一次。
- QoS 2:消息只传送一次。
5.2 QOS的实现原理
PublishApiSevice
1.2.1、QOS中1和2 需要确认,这里做了一个缓存
channel代表会话
@Getter
@Setter
public class MqttChannel {
private transient volatile Channel channel;
private String deviceId;
private boolean isWill;
private volatile SubStatus subStatus; // 是否订阅过主题
private Set<String> topic ;
private volatile SessionStatus sessionStatus; // 在线 - 离线
private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除 此channel
// messageId - message(qos1) // 待确认消息
private ConcurrentHashMap<Integer,SendMqttMessage> message ;
private AtomicInteger index ;
看下消息的定义
/**
* mqtt 消息
**/
@Builder
@Data
public class SendMqttMessage {
private int messageId;
private Channel channel;
private volatile ConfirmStatus confirmStatus;
private long time;
private byte[] byteBuf;
private boolean isRetain;
private MqttQoS qos;
private String topic;
}
/**
* 确认状态
*
**/
public enum ConfirmStatus {
PUB,
PUBREC,
PUBREL,
COMPLETE,
}
public enum MqttQoS {
AT_MOST_ONCE(0),
AT_LEAST_ONCE(1),
EXACTLY_ONCE(2),
FAILURE(0x80);
private final int value;
MqttQoS(int value) {
this.value = value;
}
public int value() {
return value;
}
public static MqttQoS valueOf(int value) {
for (MqttQoS q: values()) {
if (q.value == value) {
return q;
}
}
throw new IllegalArgumentException("invalid QoS: " + value);
}
}
这里面有几个针对QOS的字段
messageId 是消息的唯一Id
ConfirmStatus 是消息的状态
MqttQoS 是消息确认状态的枚举
5.3 发送流程
protected void sendQosConfirmMsg(MqttQoS qos, MqttChannel mqttChannel, String topic, byte[] bytes) {
if(mqttChannel.isLogin()){
int messageId = mqttChannel.messageId();
switch (qos){
case AT_LEAST_ONCE:
mqttChannel.addSendMqttMessage(messageId,sendQos1Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
break;
case EXACTLY_ONCE:
mqttChannel.addSendMqttMessage(messageId,sendQos2Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
break;
}
}
}
待客户端响应之后,修改message的ConfirmStatus
/**
* 消息回复确认(qos1 级别 保证收到消息 但是可能会重复)
*/
@Override
public void puback(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
int messageId = messageIdVariableHeader.messageId();
Optional.ofNullable(mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId))
.ifPresent(msg->msg.setConfirmStatus(ConfirmStatus.COMPLETE)); // 复制为空
messageTransfer.removeQueue(channel,messageId);
}
待状态都确认完成之后,移除消息
MQTT QoS 0, 1, 2 介绍 | EMQ
6、reatain机制
发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
AbstractChannelService
// topic - 保留消息
protected static ConcurrentHashMap<String,ConcurrentLinkedQueue<RetainMessage>> retain = new ConcurrentHashMap<>();
下面的代码将retainMessage加入到缓存中
/**
* 保存保留消息
* @param topic 主题
* @param retainMessage 信息
*/
private void saveRetain(String topic, RetainMessage retainMessage, boolean isClean){
ConcurrentLinkedQueue<RetainMessage> retainMessages = retain.getOrDefault(topic, new ConcurrentLinkedQueue<>());
if(!retainMessages.isEmpty() && isClean){
retainMessages.clear();
}
boolean flag;
do{
flag = retainMessages.add(retainMessage);
}
while (!flag);
retain.put(topic, retainMessages);
}
订阅成功后发送retain消息
/**
* 订阅成功后 (发送保留消息)
*/
public void suscribeSuccess(String deviceId, Set<String> topics){
doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {
MqttChannel mqttChannel = mqttChannels.get(deviceId);
mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识
mqttChannel.addTopic(strings);
executorService.execute(() -> {
Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
if(mqttChannel1.isLogin()){
strings.parallelStream().forEach(topic -> {
addChannel(topic,mqttChannel);
sendRetain(topic,mqttChannel); // 发送保留消息
});
}
});
});
});
}
总结:给还没上线的人留言
7、遗嘱消息
遗嘱消息是 MQTT 为那些可能出现意外断线的设备提供的将遗嘱优雅地发送给其他客户端的能力。设置了遗嘱消息消息的 MQTT 客户端异常下线时,MQTT 服务器会发布该客户端设置的遗嘱消息。
- 当设备意外断线时,遗嘱消息将被发送至遗嘱 Topic;
public void doSend( String deviceId) { // 客户端断开连接后 开启遗嘱消息发送
if(StringUtils.isNotBlank(deviceId)&&(willMeaasges.get(deviceId))!=null){
WillMeaasge willMeaasge = willMeaasges.get(deviceId);
channelService.sendWillMsg(willMeaasge); // 发送遗嘱消息
if(!willMeaasge.isRetain()){ // 移除
willMeaasges.remove(deviceId);
log.info("deviceId will message["+willMeaasge.getWillMessage()+"] is removed");
}
}
}
总结:给还在等消息的人留言
8、Clean Session
为 false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
持久会话避免了客户端掉线重连后消息的丢失,并且免去了客户端连接后重复的订阅开销。这一功能在带宽小,网络不稳定的物联网场景中非常实用。
MqttChannel
private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除 此channel
9、cleint id
如果客户端使用一个重复的 Client ID 连接至服务器,将会把已使用该 Client ID 连接成功的客户端踢下线。
10、连接超时(Connect Timeout)
连接超时时长,收到服务器连接确认前的等待时间,等待时间内未收到连接确认则为连接失败。
AbsMqttProducer
protected void connectTo(ConnectOptions connectOptions){
checkConnectOptions(connectOptions);
if(this.nettyBootstrapClient ==null){
this.nettyBootstrapClient = new NettyBootstrapClient(connectOptions);
}
this.channel =nettyBootstrapClient.start();
initPool(connectOptions.getMinPeriod());
try {
countDownLatch.await(connectOptions.getConnectTime(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("InterruptedException",e);
nettyBootstrapClient.doubleConnect(); // 重新连接
}
}
11、总结
在工作中一直使用emqx,但是不知道业务原理,虽然emqx是开源的,但是因为开发语言是erlang,也不好下手去读,在网上随便找了一个开源的实现,代码很老,但是基本的功能属性都有
https://github.com/1ssqq1lxr/iot_push