Java实践-物联网loT入门-MQTT传输协议

news2025/1/17 6:14:44

前言

MQTT是一个极其轻量级发布/订阅消息传输协议,适用于网络带宽较低的场合.

通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息

业务场景

硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。

MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。

什么是MQTT

MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景,比如:

  • 遥感数据
  • 汽车
  • 智能家居
  • 智慧城市
  • 医疗医护

由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

  1. 精简,不添加可有可无的功能。
  2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递。
  3. 允许用户动态创建主题,零运维成本。
  4. 把传输量降到最低以提高传输效率。
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内。
  6. 支持连续的会话控制。
  7. 理解客户端计算能力可能很低。
  8. 提供服务质量管理。
  9. 假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

MQTT与HTTP的区别

首先看一下HTTP请求:

  • HTTP 是一种同步协议。客户端需要等待服务器响应。Web 浏览器具有这样的要求,但它的代价是牺牲了可伸缩性。
  • HTTP 是单向的。客户端必须发起连接。
  • HTTP 是一种 1-1 协议。客户端发出请求,服务器进行响应。将消息传送到网络上的所有设备上,不但很困难,而且成本很高。
  • HTTP 是一种有许多标头和规则的重量级协议。它不适合受限的网络。

再来看一下MQTT: 

        在 IoT 领域,大量设备以及很可能不可靠或高延迟的网络使得同步通信成为问题。异步消息协议更适合 IoT 应用程序。传感器发送读数,让网络确定将其传送到目标设备和服务的最佳路线和时间。在 IoT 应用程序中,设备或传感器通常是客户端,这意味着它们无法被动地接收来自网络的命令。

MQTT的核心: 发布和订阅模型 

MQTT 协议在网络中定义了两种实体类型:一个消息代理和一些客户端。

代理是一个服务器,它从客户端接收所有消息,然后将这些消息路由到相关的目标客户端。

客户端是能够与代理交互来发送和接收消息的任何事物。客户端可以是现场的 IoT 传感器,或者是数据中心内处理 IoT 数据的应用程序。

  1. 客户端连接到代理。它可以订阅代理中的任何消息“主题”。此连接可以是简单的 TCP/IP 连接,也可以是用于发送敏感消息的加密 TLS 连接。
  2. 客户端通过将消息和主题发送给代理,发布某个主题范围内的消息。
  3. 代理然后将消息转发给所有订阅该主题的客户端。

        同时,MQTT 是轻量级的。它有一个用来指定消息类型的简单标头,有一个基于文本的主题,还有一个任意的二进制有效负载。应用程序可对有效负载采用任何数据格式,比如 JSON、XML、加密二进制或 Base64,只要目标客户端能够解析该有效负载。

Java实例

1.通过包管理工具 Maven导入依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.2</version>
</dependency>

 2.编写订阅方的代码,并启动。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


/**
 * 订阅方
 */
public class SubscribeSample {
    public static void main(String[] args) {
        //EMQ X 默认端口 1883
        String broker = "tcp://broker.emqx.io:1883";
        String TOPIC = "test";
        int qos = 1;
        String clientid = "subClient";
        String userName = "admin";
        String passWord = "password";
        try {
            // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            // MQTT的连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调函数
            client.setCallback(new MqttCallback() {

                public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost");
                }

                public void messageArrived(String topic, MqttMessage message) {
                    System.out.println("======监听到来自[" + topic + "]的消息======");
                    System.out.println("message content:"+new String(message.getPayload()));
                    System.out.println("============");
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------"+ token.isComplete());
                }

            });

            // 建立连接
            System.out.println("连接到 broker: " + broker);
            client.connect(options);

            System.out.println("连接成功.");
            //订阅消息
            client.subscribe(TOPIC, qos);
            System.out.println("开始监听" + TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

启动订阅方运行结果如下:

 

3.编写发布方的代码并启动

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 发布方
 */
public class PublishSample {
    public static void main(String[] args) {

        String topic = "test";
        String content = "你好,我给你发了条消息呀!!!!!!!!!!!";
        int qos = 1;
        String broker = "tcp://broker.emqx.io:1883";
        String userName = "admin";
        String password = "password";
        String clientId = "pubClient";
        // 内存存储
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            // 创建客户端
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            // 创建链接参数
            MqttConnectOptions connOpts = new MqttConnectOptions();
            // 在重新启动和重新连接时记住状态
            connOpts.setCleanSession(false);
            // 设置连接的用户名
            connOpts.setUserName(userName);
            connOpts.setPassword(password.toCharArray());
            // 建立连接
            System.out.println("连接到 broker: " + broker);
            sampleClient.connect(connOpts);
            System.out.println("连接成功.");
            // 创建消息
            MqttMessage message = new MqttMessage(content.getBytes());
            // 设置消息的服务质量
            message.setQos(qos);
            // 发布消息
            System.out.println("向" + topic + "发送消息:" + message);
            sampleClient.publish(topic, message);
            // 断开连接
            sampleClient.disconnect();
            // 关闭客户端
            sampleClient.close();
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

 

启动发布方运行结果如下:

4.最后查看订阅方的控制台,订阅方收到消息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/985535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

红米note5 拆金属外壳

红米note5 拆金属外壳 卡扣式 外壳 屏幕 先拿下来&#xff0c;sim卡的那个卡座。 贴边&#xff0c;到这个卡扣的地方&#xff0c;用工具翘一下&#xff0c;然后下一个卡扣的地方翘一下&#xff0c;然后慢慢的整个的拿下来。 别硬翘&#xff0c;小刀的刀尖&#xff0c;容易给…

jeesite自定义数据字典,自定义字典表,自带树选择数据源(保姆级图文教程)

文章目录 前言一、框架自带树字典表如何使用二、自定义表作为字典表1. 下拉选项使用自建表作为字典表。实际效果框架示例实际开发代码总结前言 项目开发中字典表如果不满足实际需求,比如使用自己的表作为字典,系统自带字典表树如何使用等问题进行总结记录。 一、框架自带树字…

端口扫描-安全体系-网络安全技术和协议

端口扫描-安全体系-网络安全技术和协议 端口扫描信息安全的保证体系和评估方法网络安全技术网络攻击和威胁(重要)网络安全协议 端口扫描 全TCP连接:三次握手 半打开式扫描:前两次握手 FIN扫描:不用建立TCP连接 第三方扫描: 拒绝服务攻击有: 同步包风暴ICMP攻击SNMP攻击 都是修改…

卡牌类游戏推荐,卡牌类三国手游排行榜

以下是小编要推荐给大家的关于卡牌类三国手游排行榜的内容。这里有来自各个历史阶段的名将和美女&#xff0c;让你体验最真实的三国战役。你可以将各种战略思维运用到其中&#xff0c;感受步步为营的喜悦&#xff0c;最终赢得战火纷飞的三国&#xff0c;如果想了解每个游戏的具…

c高级day2 linux指令的补充和shell脚本

思维导图 写一个1.sh脚本&#xff0c;将以下内容放到脚本中&#xff1a; 在家目录下创建目录文件&#xff0c;dir 在dir下创建dir1和dir2 把当前目录下的所有文件拷贝到dir1中&#xff0c; 把当前目录下的所有脚本文件拷贝到dir2中 把dir2打包并压缩为dir2.tar.xz 再把di…

维维数码:网络电视机顶盒怎么样?口碑电视机顶盒排行榜

欢迎各位来到维维数码频道&#xff0c;最近后台收到很多私信咨询我网络电视机顶盒怎么样&#xff0c;值不值得买&#xff1f;当家里是老电视想升级智能电视&#xff0c;或者智能电视使用几年后出现卡顿、资源少、无法下载软件等&#xff0c;只需要买一台网络电视机顶盒就可以解…

Tomcat环境变量配置教程

1、在官网下载并解压Tomcat&#xff0c;记住解压好的目录&#xff0c;后面配置环境需要用到。 官网地址&#xff1a;Apache Tomcat - Welcome! --- 阿帕奇雄猫 - 欢迎&#xff01; 2、右键此电脑&#xff08;我的电脑&#xff09;->属性->高级系统设置->环境变量 3、…

NoSQL之redis持久化(RDB、AOF)

目录 一、Redis高可用 二、Redis持久化 1、持久化的功能 2、Redis的两种持久化 三、RDB 持久化 1、触发条件 1.1 手动触发 1.2 自动触发 1.3 其它自动触发机制 2、执行流程 3、启动时加载RED文件(恢复) 四、Redis的AOF持久化 1、开启AOF 2、执行流程 2.1 命令追加…

SSD202D-boot-IO复用功能

SSD202D-logo分区添加dtb_旋风旋风的博客-CSDN博客 可以参考该博客,当然我为了兼容以前的固件又优化了该思路: 直接把对应的包添加在logo_202d的尾部,这样就不会影响原来的包 这两张就是修改之后的结构,只在尾部追加DTB 其中ABC结构体 //A结构体- (12 size) typedef struc…

elementPlus + table 树形懒加载 新增,删除,修改 局部刷新

#直接上代码# 1.表格数据 2.数据源 <m-table ref"cTable" v-if"Object.keys(props.tableData).length" :options"props.tableOptions" :data"props.tableData.data" :isLoading"props.tableData.loading" elementLo…

数据结构与算法(四):栈与队列

栈与队列 我们一般把栈与队列合在一块讨论&#xff0c;因为他们具有相似的性质。 栈&#xff1a;栈是限定仅在表尾进行插入和删除操作的线性表&#xff0c;所以栈又称为后进先出&#xff08;LastIn First Out&#xff09;的线性表&#xff0c;简称LIFO结构。 队列&#xff1…

【C++二叉树】进阶OJ题

【C二叉树】进阶OJ题 目录 【C二叉树】进阶OJ题1.二叉树的层序遍历II示例代码解题思路 2.二叉搜索树与双向链表示例代码解题思路 3.从前序与中序遍历序列构造二叉树示例代码解题思路 4.从中序与后序遍历序列构造二叉树示例代码解题思路 5.二叉树的前序遍历&#xff08;非递归迭…

C++学习笔记--函数重载(2)

文章目录 1.3、Function Templates Handling1.3.1、Template Argument Deduction1.3.2、Template Argument Substitution 1.4、Overload Resolution1.4.1、Candidate functions1.4.2、Viable functions1.4.3、Tiebreakers 1.5、走一遍完整的流程1.6、Name Mangling1.7、总结 1.…

SpringCloud-Hystrix 服务降级与熔断

接上文SpringCloud-Feign 问题描述 为了解决上述情况&#xff0c;SpringCloud提供了Hystrix熔断器组件&#xff0c;如同保险丝。服务降级则不会直接返回错误&#xff0c;而是提供一个补救措施&#xff0c;正常响应给请求者。 1.服务降级 基于借阅管理服务&#xff0c;不开启…

JavaScript-----个性名片案例展示

目录 前言&#xff1a; 效果展示 代码&#xff1a; html代码 CSS代码 图片资源&#xff1a; 前言&#xff1a; 今天我们就通过刚刚学习的JavaScript知识点以及前面学习了的html和CSS的知识点去做一个小作品&#xff0c;这是一个个性名片的案例&#xff08;有代码资源和图片…

【CSDN技术】Markdown编辑器如何使用-csdn博客编写入门

Markdown编辑器如何使用-csdn博客编写入门 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自…

【脑机接口】基于运动想象的康复指导在脑卒中偏瘫患者中的应用

【摘要】 目的 探讨运动想象康复指导对脑卒中偏瘫患者的康复效果及意义。 方法 将 60例脑卒中偏瘫患者随机分为观察组(n31)和对照组(n29)&#xff0c;对照组的康复训练指导采用讲解示范法&#xff0c;观察组采用运动想象法 。比较两组 患者 的运 动功能 、日常生活 活动能力及 …

spring-data-jpa编程中,方法参数的数据类型不一致引发的问题记录

一、代码结构 domain model BookDistributionRepository.java infrastructure.persistence jpa BookDistributionRepositoryJPA.javaBookDistributionRepositoryJPAImpl.java 1、接口BookDistributionRepository.java public interface BookDistributionRepository {List&…

锯齿波-RC充放电路

锯齿波电路根据应用的不同又叫扫描电路、时基断电路&#xff0c;在一些仪器仪表等电子设备中经常用到的一种单元电路。锯齿波信号的明显的特征是电压或是电流先随时间呈线性增长&#xff0c;再迅速下降&#xff0c;然后再线性上升&#xff0c;再迅速下降&#xff0c;如此循环。…

mysql8 Found option without preceding group错误

这个错误说起来是真的坑&#xff0c;今晚帮同学在window操作系统上安装mysql8当自定义my.ini文件的时候 就出现一下错误&#xff0c;死活启动不起来 一直报错。当删掉这个my.ini文件的时候却能启动&#xff0c;刚开始以为是my.ini里的配置选项不对&#xff0c;一个一个筛查后依…