MQTT宝典

news2024/11/27 8:23:57

文章目录

    • 1.介绍
    • 2.发布和订阅
    • 3.MQTT 数据包结构
    • 4.Demo
    • 5.EMQX

1.介绍

什么是MQTT协议
MQTT(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

特点
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。

MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

MQTT 与 HTTP 一样,MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。

2.发布和订阅

MQTT使用的发布/订阅消息模式,它提供了一对多的消息分发机制,从而实现与应用程序的解耦。

这是一种消息传递模式,消息不是直接从发送器发送到接收器(即点对点),而是由MQTT server(或称为 MQTT Broker)分发的。

在这里插入图片描述
MQTT 服务器是发布-订阅架构的核心。

它可以非常简单地在Raspberry Pi或NAS等单板计算机上实现,当然也可以在大型机或 Internet 服务器上实现。

服务器分发消息,因此必须是发布者,但绝不是订阅者!

客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。

客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。

消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。

EG:
在这里插入图片描述
在这里插入图片描述
QoS(Quality of Service levels)
服务质量是 MQTT 的一个重要特性。当我们使用 TCP/IP 时,连接已经在一定程度上受到保护。但是在无线网络中,中断和干扰很频繁,MQTT 在这里帮助避免信息丢失及其服务质量水平。这些级别在发布时使用。如果客户端发布到 MQTT 服务器,则客户端将是发送者,MQTT 服务器将是接收者。当MQTT服务器向客户端发布消息时,服务器是发送者,客户端是接收者。

  • QoS 0 : 这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1
    在这里插入图片描述
  • QoS 1 : 承诺消息将至少传送一次给订阅者。
    在这里插入图片描述
  • QoS 2 : 我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。
    在这里插入图片描述

3.MQTT 数据包结构

  • 固定头(Fixed header),存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识;

  • 可变头(Variable header),存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容;

  • 消息体(Payload),存在于部分MQTT数据包中,表示客户端收到的具体内容;

整体MQTT的消息格式如下图所示:
在这里插入图片描述

MQTT固定头
固定头存在于所有MQTT数据包中,其结构如下:
在这里插入图片描述
固定头的消息格式

  • 消息类型 / message type
    位置: byte 1, bits 7-4
    4位的无符号值,类型如下:
    在这里插入图片描述
  • 标识位 / DUP/RET
    位置: byte 1, bits 3-0。

在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:

在这里插入图片描述
DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为 1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。

QoS发布消息的服务质量,即:保证消息传递的次数

在这里插入图片描述
RETAIN:发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。

  • 剩余长度

位置:byte 1

固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0

MQTT可变头 / Variable header

MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是做为包的标识:
在这里插入图片描述
很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:

PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、

SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK

Payload消息体
Payload消息体是MQTT数据包的第三部分,CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体:

  • CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码

  • SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。

  • SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。

  • UNSUBSCRIBE,消息体内容是要订阅的主题。

4.Demo

  • DEMO1
 public static void main(String[] args) {
        String broker = "tcp://172.168.1.122:2314";
        String clientId = "JavaSample";
        //Use the memory persistence
        MemoryPersistence persistence = new MemoryPersistence();


        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("broker:" + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");

            String topic = "demo/topics";
            System.out.println("Subscribe to topic:" + topic);
            sampleClient.subscribe(topic);//订阅主题
            sampleClient.setCallback(new MqttCallback() {
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
                    System.out.println(theMsg);
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                public void connectionLost(Throwable throwable) {
                }
            });


            String content = "Message from MqttPublishSample";
            int qos = 2;
            System.out.println("Publishing message:" + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);//发布消息
            System.out.println("Message published");

        } catch (MqttException me) {
            System.out.println("reason" + me.getReasonCode());
            System.out.println("msg" + me.getMessage());
            System.out.println("loc" + me.getLocalizedMessage());
            System.out.println("cause" + me.getCause());
            System.out.println("excep" + me);
            me.printStackTrace();
        }
    }

5.EMQX

  • 本地搭建EMQX服务
    1、下载emqx压缩文件:wget https://www.emqx.com/zh/downloads/broker/5.0.3/emqx-5.0.3-el7-amd64.tar.gz
    2、解压文件:mkdir -p emqx && tar -zxvf emqx-5.0.3-el7-amd64.tar.gz -C emqx
    3、启动服务:./emqx/bin/emqx start

在这里插入图片描述

  • 整合使用

在这里插入图片描述

配置

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

        <!-- MQTT -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
# Mqtt配置
mqtt:
  #我是在本地搭建了emqx服务,192.168.2.31就是我本地emqx服务的地址。也可用公共测试不需要搭建emqx服务,将ip改为broker.emqx.io即可。
  serverURIs: tcp://192.168.111.5:1883
  username: #可不填写
  password:  #可不填写
  qos: 2 #等级 有 0 1 2 三种
  clientId: mqttxxx
  topic: testTopic  #订阅的主题,多个时可以使用逗号分开 如:topic1,topic2,topic
  enabled: true  #是否打开mqtt服务
  keepalive: 100 #心跳时间  不需要动
  timeout: 100 # 超时时间秒 不需要动
@Configuration
public class MqttConfig {

    @Autowired
    private MqttPushClient mqttPushClient;

    @Value("${mqtt.username:{null}}")
    private String username;

    @Value("${mqtt.password:{null}}")
    private String password;

    @Value("${mqtt.serverURIs:{null}}")
    private String hostUrl;

    @Value("${mqtt.clientId:{null}}")
    private String clientId;

    @Value("${mqtt.topic:{null}}")
    private String defaultTopic;

    @Value("${mqtt.qos:{null}}")
    private int qos;

    @Value("${mqtt.enabled:{null}}")
    private boolean enabled;

    @Value("${mqtt.keepalive:{null}}")
    private int keepalive;

    @Value("${mqtt.timeout:{null}}")
    private int timeout;

    //订阅主体
    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = defaultTopic.split(",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接
            for(int i=0; i<mqtt_topic.length; i++){
                mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
            }
        }
        return mqttPushClient;
    }

    //发送消息到对应主题
    @Bean
    public MqttPushClient pushMessage() {
        mqttPushClient.publish(0,true,"wsy","呵呵哈哈哈 你好呀");
        return mqttPushClient;
    }
    
}
@Component
public class MqttPushClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public boolean publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            return true;
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
            return false;
        } catch (MqttException e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("开始订阅主题" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}
@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        logger.info("连接断开,可以做重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));

        _topic = topic;
        _qos = mqttMessage.getQos()+"";
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    //别的Controller层会调用这个方法来  获取  接收到的硬件数据
    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }
}

注意:
clientId不能重复,若存在重复的 clientId 连接,会导致争抢而连接不上。(就是存在两个客户端 clientId 相同,两个在打架,一直争同一连接,导致一直重连,重复发布消息)

发布:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

订阅:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

参考资料

https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/877483.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

opencv实战项目 手势识别-手势音量控制(opencv)

本项目是使用了谷歌开源的框架mediapipe&#xff0c;里面有非常多的模型提供给我们使用&#xff0c;例如面部检测&#xff0c;身体检测&#xff0c;手部检测等。 手势识别系列文章 1.opencv实现手部追踪&#xff08;定位手部关键点&#xff09; 2.opencv实战项目 实现手势跟踪…

探索 C++ 标准库:std::string 库函数用法示例

目录 引言 一、构造函数 1.1 string() 1.2 string (const string& str) 1.3 string (const string& str, size_t pos, size_t len npos) 1.4 string (const char* s) 1.5 string (const char* s, size_t n) 1.6 string (size_t n, char c&#xff09;​ 二、容…

剑指offer11-20

文章目录 11.旋转数组的最小数字12.矩阵中的路径13.机器人的运动范围15.二进制中1的个数16.数值的整数次方17.打印从1到最大的n位数&#xff08;待写&#xff09;18.删除链表的节点19.正则表达式匹配&#xff08;好难&#xff09;20. 没意义算了 11.旋转数组的最小数字 肯定不是…

【Git】—— 标签管理

目录 &#xff08;一&#xff09;理解标签 1、作用 &#xff08;二&#xff09;创建标签 &#xff08;三&#xff09;操作标签 1、删除标签 2、推送标签 3、删除远程标签 &#xff08;一&#xff09;理解标签 标签 tag &#xff0c;可以简单的理解为是对某次 commit 的…

C++11时间日期库chrono的使用

chrono是C11中新加入的时间日期操作库&#xff0c;可以方便地进行时间日期操作&#xff0c;主要包含了&#xff1a;duration, time_point, clock。 时钟与时间点 chrono中用time_point模板类表示时间点&#xff0c;其支持基本算术操作&#xff1b;不同时钟clock分别返回其对应…

Jenkins 监控dist.zip文件内容发生变化 触发自动部署

为Jenkins添加plugin http://xx:xx/manage 创建一个任务 构建触发器 每3分钟扫描一次&#xff0c;发现指定文件build.zip文件的MD5发生变化后 触发任务

脚本一键生成通用接口,一分钟实现增删改查

直接使用无需看此配置 快速生成通用接口业务配置 &#xff1a; https://blog.zysicyj.top/2023/08/14/快速生成通用接口业务配置 一、插件安装 二、脚本 关注绿色聊天软件【程序员朱永胜】回复&#xff1a;1013 下载 三、使用 拷贝到扩展目录下 修改mybatisCodehelper.vm 修改i…

【爱书不爱输的程序猿】CPOLAR+HFS,低成本搭建NAS

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 通过HFS低成本搭建NAS&#xff0c;并内网穿透实现公网访问 - cpolar 极点云 前言1.下载安装cpolar1.1 设置HFS访客1.2 虚拟文件系统 2. 使用cpolar建立一条内网穿透数据隧道2.1 保留…

强化学习 PPO算法和代码

PPO 效果 字体找不到 ubuntu python findfont: Font family ‘Alibaba PuHuiTi 3.0’ not found. shell 清除缓存&#xff1a; rm ~/.cache/matplotlib -rf到这里下载 阿里巴巴普惠体3.0 https://fonts.alibabagroup.com/ 然后安装字体 PPO import matplotlib from mat…

​​C++多态​​

目录 1. 多态的概念 2. 多态的定义及实现 多态的构成条件 虚函数 虚函数的重写 特例 override 和 final 1. final&#xff1a;修饰虚函数&#xff0c;表示该虚函数不能再被重写 2.override: 检查派生类虚函数是否重写了基类某个虚函数&#xff0c;如果没有重写编译报错…

【数据结构】二叉树篇|超清晰图解和详解:二叉树的最近公共祖先

博主简介&#xff1a;努力学习的22级计算机科学与技术本科生一枚&#x1f338;博主主页&#xff1a; 是瑶瑶子啦每日一言&#x1f33c;: 你不能要求一片海洋&#xff0c;没有风暴&#xff0c;那不是海洋&#xff0c;是泥塘——毕淑敏 目录 一、题目二、题解三、代码 一、题目 …

Stable Diffusion +EbSynth应用实践和经验分享

Ebsynth应用 1.安装ffmpeg 2.安装pip install transparent-background,下载模型https://www.mediafire.com/file/gjvux7ys4to9b4v/latest.pth/file 放到C:\Users\自己的用户名.transparent-background\加一个ckpt_base.pth文件 3.秋叶安装ebsynth插件,重启webui 填写项目基本…

线段树-模板-区间查询-区间修改

【模板】线段树 2 传送门&#xff1a;https://www.luogu.com.cn/problem/P3373 题单&#xff1a;https://www.luogu.com.cn/training/16376#problems 题目描述 如题&#xff0c;已知一个数列&#xff0c;你需要进行下面三种操作&#xff1a; 将某区间每一个数乘上 x x x&a…

FPGA学习——驱动WS2812光源并进行动态显示

文章目录 一、WS2812手册分析1.1 WS2812灯源特性及概述1.2 手册重点内容分析1.2.1 产品概述1.2.2 码型及24bit数据设计 二、系统设计2.1 模块设计2.2 模块分析2.2.1 驱动模块2.2.1 数据控制模块 三、IP核设置及项目源码3.1 MIF文件设计3.2 ROM IP核调用3.3 FIFO IP核调用3.4 项…

机器学习-特征选择:如何使用递归特征消除算法自动筛选出最优特征?

一、引言 在实际应用中&#xff0c;特征选择作为机器学习和数据挖掘领域的重要环节&#xff0c;对于提高模型性能和减少计算开销具有关键影响。特征选择是从原始特征集中选择最相关和最具区分力的特征子集&#xff0c;以提高模型的泛化能力和可解释性。 特征选择在实践中具有以…

算法笔试 java 输入输出练习

在线编程题刷题训练 所有答案 scancer函数的用法 输入输出总结top&#xff01;&#xff01;&#xff01;&#xff01; java如何调用函数&#xff08;方法&#xff09; java刷acm的各种输入输出 vscode配置java环境 子函数的调用&#xff0c;直接定义一个static子函数调用就…

人工智能任务1-【NLP系列】句子嵌入的应用与多模型实现方式

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能任务1-【NLP系列】句子嵌入的应用与多模型实现方式。句子嵌入是将句子映射到一个固定维度的向量表示形式&#xff0c;它在自然语言处理&#xff08;NLP&#xff09;中有着广泛的应用。通过将句子转化为向量…

ARM(汇编指令)

.global _start _start:/*mov r0,#0x5mov r1,#0x6 bl LoopLoop:cmp r0,r1beq stopsubhi r0,r0,r1subcc r1,r1,r0mov pc,lr*/ mov r0,#0x1mov r1,#0x0mov r2,#0x64bl Loop Loop:cmp r0,r2bhi stopadd r1,r1,r0add r0,r0,#0x01mov pc,lr stop:B stop.end

Android Ble蓝牙App(五)数据操作

Ble蓝牙App&#xff08;五&#xff09;数据操作 前言正文一、操作内容处理二、读取数据① 概念② 实操 三、写入数据① 概念② 实操 四、打开通知一、概念二、实操三、收到数据 五、源码 前言 关于低功耗蓝牙的服务、特性、属性、描述符都已经讲清楚了&#xff0c;而下面就是使…

vue自定义指令动态绑定

在企业微信侧边栏应用中&#xff0c;给dialog添加了拖拽功能&#xff0c;但是因为dialog高度超过了页面高度&#xff0c;所以高度100%时拖拽有个bug--自动贴到窗口顶部而且企业侧边栏宽高都有限制&#xff0c;拖拽效果并不理想&#xff0c;所以就想缩小dialog再进行拖拽。 拖拽…