MQTT服务器源码解析

news2024/12/24 2:20:24

目录

1、关于header问题 

2、MQTT 连接参数的使用

2.1连接地址

2.2 基于 TCP 的 MQTT 连接

2.3 基于 WebSocket 的连接

3、订阅topic

 4、推送消息给订阅者

5、QOS 机制

5.1 QOS是什么

5.2 QOS的实现原理

5.3 发送流程

6、reatain机制

总结:给还没上线的人留言

7、遗嘱消息

总结:给还在等消息的人留言

8、Clean Session

9、cleint id

如果客户端使用一个重复的 Client ID 连接至服务器,将会把已使用该 Client ID 连接成功的客户端踢下线。

10、连接超时(Connect Timeout)

11、总结


 

1、关于header问题 

mqtt的一直说header比较节省流量,这是为什么呐、?看下结构图

可以看到有不同的header结构,字段也很少,确实节省流量

2、MQTT 连接参数的使用

2.1连接地址

MQTT 的连接地址通常包含 :服务器 IP 或者域名、服务器端口、连接协议。

2.2 基于 TCP 的 MQTT 连接

mqtt 是普通的 TCP 连接,端口一般为 1883。

mqtts 是基于 TLS/SSL 的安全连接,端口一般为 8883。

比如 mqtt://broker.emqx.io:1883 是一个基于普通 TCP 的 MQTT 连接地址。

2.3 基于 WebSocket 的连接

ws 是普通的 WebSocket 连接,端口一般为 8083。

wss 是基于 WebSocket 的安全连接,端口一般为 8084。

当使用 WebSocket 连接时,连接地址还需要包含 Path,EMQX 默认配置的 Path 是 /mqtt。比如 ws://broker.emqx.io:8083/mqtt 是一个基于 WebSocket 的 MQTT 连接地址。

3、订阅topic

客户端订阅topic之后,服务器是如何保存,并且如何转发的。

可以看到服务端订阅之后会放入一个set,在做转发的时候动态匹配,匹配成功之后才会进行转发。

这里也是用了线程池

com.lxr.iot.bootstrap.channel.MqttHandlerService#subscribe

 /**
     * 订阅
     */
    @Override
    public void subscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
        Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription ->
                mqttTopicSubscription.topicName()
        ).collect(Collectors.toSet());
        mqttChannelService.suscribeSuccess(mqttChannelService.getDeviceId(channel), topics);
        subBack(channel, mqttSubscribeMessage, topics.size());
    }
    
    
    /**
     * 订阅成功后 (发送保留消息)
     */
    public void suscribeSuccess(String deviceId, Set<String> topics){
        doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {
            MqttChannel mqttChannel = mqttChannels.get(deviceId);
            mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识
            mqttChannel.addTopic(strings);
            executorService.execute(() -> {
                Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
                    if(mqttChannel1.isLogin()){
                        strings.parallelStream().forEach(topic -> {
                            addChannel(topic,mqttChannel);
                            sendRetain(topic,mqttChannel); // 发送保留消息
                        });
                    }
                });
            });
        });
    }

 4、推送消息给订阅者

遍历所有的channel,根据不同的QOS进行转发。

这里的channel也做了使用一个map进行保存

protected  static  Cache<String, Collection<MqttChannel>> mqttChannelCache = CacheBuilder.newBuilder().maximumSize(100).build();
 com.lxr.iot.bootstrap.channel.MqttChannelService#push
 /**
     * 推送消息给订阅者
     */
    private  void push(String topic, MqttQoS qos, byte[] bytes, boolean isRetain){
        Collection<MqttChannel> subChannels = getChannels(topic, topic1 -> cacheMap.getData(getTopic(topic1)));
        if(!CollectionUtils.isEmpty(subChannels)){
            subChannels.parallelStream().forEach(subChannel -> {
                switch (subChannel.getSessionStatus()){
                    case OPEN: // 在线
                        if(subChannel.isActive()){ // 防止channel失效  但是离线状态没更改
                            switch (qos){
                                case AT_LEAST_ONCE:
                                    sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,subChannel,topic,bytes);
                                    break;
                                case AT_MOST_ONCE:
                                    sendQos0Msg(subChannel.getChannel(),topic,bytes);
                                    break;
                                case EXACTLY_ONCE:
                                    sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,subChannel,topic,bytes);
                                    break;
                            }
                        }
                        else{
                            if(!subChannel.isCleanSession() & !isRetain){
                                clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
                                        SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
                                break;
                            }
                        }
                        break;
                    case CLOSE: // 连接 设置了 clean session =false
                        clientSessionService.saveSessionMsg(subChannel.getDeviceId(),
                                SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );
                        break;
                }
            });
        }
    }

5、QOS 机制

5.1 QOS是什么

可靠的消息传递

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
  • QoS 1:消息至少传送一次。
  • QoS 2:消息只传送一次。

5.2 QOS的实现原理

PublishApiSevice

1.2.1、QOS中1和2 需要确认,这里做了一个缓存

channel代表会话

@Getter
@Setter
public class MqttChannel {

    private transient  volatile  Channel channel;

    private String deviceId;

    private boolean isWill;

    private volatile SubStatus subStatus; // 是否订阅过主题

    private  Set<String> topic  ;

    private volatile SessionStatus sessionStatus;  // 在线 - 离线

    private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除  此channel

    // messageId - message(qos1)  // 待确认消息
    private ConcurrentHashMap<Integer,SendMqttMessage>  message ;

    
    
    private  AtomicInteger index ;

看下消息的定义

/**
 * mqtt 消息

 **/
@Builder
@Data
public class SendMqttMessage {

    private int messageId;

    private Channel channel;

    private volatile ConfirmStatus confirmStatus;

    private long time;

    private byte[]  byteBuf;

    private boolean isRetain;

    private MqttQoS qos;

    private String topic;

}
/**
 * 确认状态
 *
 **/
public enum ConfirmStatus {
    PUB,
    PUBREC,
    PUBREL,
    COMPLETE,
}
public enum MqttQoS {
    AT_MOST_ONCE(0),
    AT_LEAST_ONCE(1),
    EXACTLY_ONCE(2),
    FAILURE(0x80);

    private final int value;

    MqttQoS(int value) {
        this.value = value;
    }

    public int value() {
        return value;
    }

    public static MqttQoS valueOf(int value) {
        for (MqttQoS q: values()) {
            if (q.value == value) {
                return q;
            }
        }
        throw new IllegalArgumentException("invalid QoS: " + value);
    }
}

这里面有几个针对QOS的字段

messageId 是消息的唯一Id

ConfirmStatus 是消息的状态

MqttQoS 是消息确认状态的枚举

5.3 发送流程

protected void sendQosConfirmMsg(MqttQoS qos, MqttChannel mqttChannel, String topic, byte[] bytes) {
        if(mqttChannel.isLogin()){
            int messageId = mqttChannel.messageId();
            switch (qos){
                case AT_LEAST_ONCE:
                    mqttChannel.addSendMqttMessage(messageId,sendQos1Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
                    break;
                case EXACTLY_ONCE:
                    mqttChannel.addSendMqttMessage(messageId,sendQos2Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));
                    break;
            }
        }
    }

待客户端响应之后,修改message的ConfirmStatus

/**
     * 消息回复确认(qos1 级别 保证收到消息  但是可能会重复)
     */
    @Override
    public void puback(Channel channel, MqttMessage mqttMessage) {
        MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
        int messageId = messageIdVariableHeader.messageId();
        Optional.ofNullable(mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId))
                .ifPresent(msg->msg.setConfirmStatus(ConfirmStatus.COMPLETE)); // 复制为空
        messageTransfer.removeQueue(channel,messageId);
    }

待状态都确认完成之后,移除消息

MQTT QoS 0, 1, 2 介绍 | EMQ

6、reatain机制

发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。

AbstractChannelService

// topic - 保留消息
protected  static  ConcurrentHashMap<String,ConcurrentLinkedQueue<RetainMessage>> retain = new ConcurrentHashMap<>(); 

下面的代码将retainMessage加入到缓存中

/**
     * 保存保留消息
     * @param topic 主题
     * @param retainMessage 信息
     */
    private void saveRetain(String topic, RetainMessage retainMessage, boolean isClean){
        ConcurrentLinkedQueue<RetainMessage> retainMessages = retain.getOrDefault(topic, new ConcurrentLinkedQueue<>());
        if(!retainMessages.isEmpty() && isClean){
            retainMessages.clear();
        }
        boolean flag;
        do{
            flag = retainMessages.add(retainMessage);
        }
        while (!flag);
        retain.put(topic, retainMessages);
    }

订阅成功后发送retain消息

/**
     * 订阅成功后 (发送保留消息)
     */
    public void suscribeSuccess(String deviceId, Set<String> topics){
        doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {
            MqttChannel mqttChannel = mqttChannels.get(deviceId);
            mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识
            mqttChannel.addTopic(strings);
            executorService.execute(() -> {
                Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {
                    if(mqttChannel1.isLogin()){
                        strings.parallelStream().forEach(topic -> {
                            addChannel(topic,mqttChannel);
                            sendRetain(topic,mqttChannel); // 发送保留消息
                        });
                    }
                });
            });
        });
    }

总结:给还没上线的人留言

7、遗嘱消息

遗嘱消息是 MQTT 为那些可能出现意外断线的设备提供的将遗嘱优雅地发送给其他客户端的能力。设置了遗嘱消息消息的 MQTT 客户端异常下线时,MQTT 服务器会发布该客户端设置的遗嘱消息。

  • 当设备意外断线时,遗嘱消息将被发送至遗嘱 Topic;
   public void doSend( String deviceId) {  // 客户端断开连接后 开启遗嘱消息发送
        if(StringUtils.isNotBlank(deviceId)&&(willMeaasges.get(deviceId))!=null){
            WillMeaasge willMeaasge = willMeaasges.get(deviceId);
            channelService.sendWillMsg(willMeaasge); // 发送遗嘱消息
            if(!willMeaasge.isRetain()){ // 移除
                willMeaasges.remove(deviceId);
                log.info("deviceId will message["+willMeaasge.getWillMessage()+"] is removed");
            }
        }
    }

总结:给还在等消息的人留言

8、Clean Session

为 false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

持久会话避免了客户端掉线重连后消息的丢失,并且免去了客户端连接后重复的订阅开销。这一功能在带宽小,网络不稳定的物联网场景中非常实用。

MqttChannel

private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除  此channel

9、cleint id

如果客户端使用一个重复的 Client ID 连接至服务器,将会把已使用该 Client ID 连接成功的客户端踢下线。

10、连接超时(Connect Timeout)

连接超时时长,收到服务器连接确认前的等待时间,等待时间内未收到连接确认则为连接失败。

AbsMqttProducer

  protected   void  connectTo(ConnectOptions connectOptions){
        checkConnectOptions(connectOptions);
        if(this.nettyBootstrapClient ==null){
            this.nettyBootstrapClient = new NettyBootstrapClient(connectOptions);
        }
        this.channel =nettyBootstrapClient.start();
        initPool(connectOptions.getMinPeriod());
        try {
            countDownLatch.await(connectOptions.getConnectTime(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("InterruptedException",e);
            nettyBootstrapClient.doubleConnect(); // 重新连接
        }
    }

11、总结

在工作中一直使用emqx,但是不知道业务原理,虽然emqx是开源的,但是因为开发语言是erlang,也不好下手去读,在网上随便找了一个开源的实现,代码很老,但是基本的功能属性都有

https://github.com/1ssqq1lxr/iot_push

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1072463.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

节日灯饰灯串灯出口欧洲CE认证检测

灯串&#xff08;灯带&#xff09;&#xff0c;这个产品的形状就象一根带子一样&#xff0c;再加上产品的主要原件就是LED&#xff0c;因此叫做灯串或者灯带。2022年&#xff0c;我国灯具及相关配件产品出口总额超过460亿美元。其中北美是最大的出口市场。其次是欧洲市场&#…

传奇开服教程GOM传奇引擎外网全套架设教程

传奇开服教程&#xff1a;GOM引擎外网架设教程 准备工具&#xff1a;版本&#xff0c;DBC数据库&#xff0c;传奇客户端&#xff0c;服务器&#xff0c;备案域名 架设传奇外网GOM引擎版本之前我们连接登录服务器&#xff0c;我们把版本&#xff0c;DBC数据库&#xff0c;传奇…

会议邀请 | 思腾合力邀您共赴PRCV 2023第六届中国模式识别与计算机视觉大会

第六届中国模式识别与计算机视觉大会&#xff08;The 6th Chinese Conference on Pattern Recognition and Computer Vision, PRCV 2023&#xff09;将于2023年10月13日至15日在厦门举办。PRCV 2023由中国计算机学会&#xff08;CCF&#xff09;、中国自动化学会&#xff08;CA…

Java8实战-总结38

Java8实战-总结38 默认方法概述默认方法默认方法的使用模式可选方法行为的多继承 默认方法 概述默认方法 默认方法是Java 8中引入的一个新特性&#xff0c;希望能借此以兼容的方式改进API。现在&#xff0c;接口包含的方法签名在它的实现类中也可以不提供实现。缺失的方法实现…

静电除尘器的工作原理及使用说明

静电除尘器是一种通过静电场将空气中的颗粒物带电并吸附到电极上&#xff0c;再利用机械振打或气流将颗粒物从电极上清除的空气净化设备。以下是静电除尘器的工作原理及使用说明&#xff1a; 工作原理&#xff1a; 静电除尘器主要由电极系统、电源系统、收尘系统、清灰系统等…

计算机的分类

文章目录 前言一、超级计算机二、大型计算机三、迷你计算机&#xff08;服务器&#xff09;四、工作站五、微型计算机 前言 世界上所有的计算机总共分为五类&#xff1a;超级计算机、大型计算机、迷你计算机、工作站、微型计算机。今天就简单介绍下各自特点和用途。 一、超级计…

allegro pcb designer铜皮合并

前提&#xff0c;两块铜皮是同一个网络 现在是没有合并的状态 第一步选中两块铜皮 点击sharpe菜单&#xff0c;点击merge shape子菜单&#xff0c;两块铜皮就合并了。

字符输入转换流字符输出转换流

字符输入转换流&字符输出转换流 字符输入转换流&字符输出转换流 package newTest;import java.io.*;public class test2 {//目标&#xff1a; 掌握字符输入转换流public static void main(String[] args) {try(//文件管道对象//得到原始的字节编码InputStream fsnew Fi…

2023-2024年云赛道模拟题库

2023-2024年云赛道模拟题库上线啦&#xff0c;全面覆盖云计算&#xff0c;云服务&#xff0c;大数据和人工智能考点&#xff0c;都是带有解析&#xff0c;实时更新&#xff0c;永久使用 参赛对象及要求&#xff1a; 参赛对象&#xff1a;现有华为ICT学院及未来有意愿成为华为…

一个CPU是怎么寻址的?

目录 CISC vs RISC 概念和历史 CISC vs RISC 对比举例&#xff1a;X86的CAS(做原子操作的) 对比举例&#xff1a;ARM的CAS(做原子操作的) 指令寻址 指令中的操作数的寻址方式 各语言对象内存布局对比 C内存布局 理解编译单元 Java对象内存布局 python对象模型 CPU …

Linux 基金会分叉 Terraform,正式推出 OpenTofu

导读Linux 基金会宣布推出 OpenTofu&#xff0c;这是一个 Terraform 的开源替代方案&#xff0c;并且分叉自 Terraform。OpenTofu 原名 OpenTF&#xff0c;为所有人提供了一个在中立治理模式下的可靠的开源替代方案。 Terraform 是 HashiCorp 开源的一个安全和高效的用来构建、…

JavaScript中的深拷贝(deep copy)和浅拷贝(shallow copy)

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

突发!该国教育部将MDPI、Hindawi和Frontiers三大出版商打包“拉黑”了!

最近&#xff0c;基于对学术诚信和作者署名的特别关切&#xff0c;马来西亚大学教育部发布了一项声明&#xff0c;禁止该国的公立大学使用政府预算来支付在MDPI、Hindawi和Frontiers三家学术出版商旗下的所有期刊上发表论文的费用。 马来西亚大学教育部还成立了一个特别委员会…

MyCat-web安装文档:安装Zookeeper、安装Mycat-web

安装Zookeeper A. 上传安装包 zookeeper-3.4.6.tar.gzB. 解压 #解压到当前目录&#xff0c;之后会生成一个安装后的目录 tar -zxvf zookeeper-3.4.6.tar.gz#加上-c 代表解压到指定目录 tar -zxvf zookeeper-3.4.6.tar.gz -C /usr/local/C. 在安装目录下&#xff0c;创建数据…

Attention Is All You Need(中文版)

目录 1 简介 2 背景 3 模型结构 3.1 编码器和解码器 3.2 注意力机制 3.2.1 缩放的点积注意力机制 3.2.2 多头注意力机制 3.2.3 Transformers中的注意力机制 3.3 基于位置的前馈神经网络 3.4 词嵌入和 softmax 3.5 位置编码 4 为什么选择自注意力机制 5 训练 5.1 硬件和时间 5.2…

深眸科技自研AI视觉分拣系统,实现物流行业无序分拣场景智慧应用

在机器视觉应用环节中&#xff0c;物体分拣是建立在识别、检测之后的一个环节&#xff0c;通过机器视觉系统对图像进行处理&#xff0c;并结合机械臂的使用实现产品分类。 通过引入视觉分拣技术&#xff0c;不仅可以实现自动化作业&#xff0c;还能提高生产线的生产效率和准确…

java案例25:批量操作文件管理器

思路&#xff1a; 编写文件管理器&#xff0c;实现文件的批量操作。具体功能&#xff1a; 1.用户输入指令1&#xff0c;代表“指定关键字检索文件”&#xff0c; 此时需要用户输入检索的目录和关键字&#xff0c; 系统在用户指定的目录下检索出文件名中包含关键字的文件 并将其…

抢先知:公抓抓 信息挖掘工具

随着经济全球化进程的加速&#xff0c;企业在不断发展和壮大&#xff0c;同时也在不断地适应市场的变化。在这个过程中&#xff0c;企业信息的及时获取和掌握变得至关重要。那么&#xff0c;最新企业信息哪里找呢&#xff1f;在这里介绍几个路径&#xff0c;可以参考&#xff0…

蓝桥杯每日一题2023.10.9

题目描述 成绩统计 - 蓝桥云课 (lanqiao.cn) 题目分析 学会使用四舍五入函数round #include<bits/stdc.h> using namespace std; int s1, s2; int main() {int n, x;cin >> n;for(int i 1; i < n; i ){cin >> x; if(x > 60)s1 ;if(x > 85)s2 ;…

GD32F103 硬件SPI通信

1. SPI的通信原理 SPI既可以做主机也可以做从机。 当做主机时。MOSI&#xff0c;SCK,CS都是作为输出。 而作为从机时。MOSI&#xff0c;SCK,CS都是作为输入。 所以SPI的硬件电路应该实现这样的功能。 2. GD32/STM32的SPI框图 1. GD32框图 如下图做主机的数据流向&#xf…