MQTT快速入门

news2024/11/27 21:00:48

官网文档

前言:

MQTT 是用于物联网连接的 OASIS 标准,它是一种基于发布订阅模式的、轻量级的消息传输协议,专为受限设备和低带宽、高延迟和不可靠的网络设计,并且能够提供一定的消息可靠性保证。得益于这些特性,MQTT 在车联网、工业制造、移动通信等领域广泛应用。

目前 MQTT 的主要版本有 v3.1.1 和 v5.0,v5.0 于 2019 年 3 月发布,相比于 v3.1.1 引入了很多改进和增强,目前市面上绝大部分的客户端 和代理都已经支持了 MQTT v5.0。

MQTT 协议要求基础传输层能够提供有序、可靠的双向传输字节流,所以 TCP 协议通常是 MQTT 的首要选择。

简单理解就是类似于web应用中的MQ消息中间件,MQ更多地是专注于数据的存储和读取,针对实时性高的流式数据处理场景。而 MQTT 则侧重于海量的设备连接和主题路由,并且在不稳定的网络环境中进行实时的、可靠的消息交换。

架构

在这里插入图片描述
从架构图可以看出MQTT大致就是三个部分组成:Broker 、public、subscriber ,发布订阅三件套

快速运用MQTT

搭Broke

  • 运用docker 搭Broker,两条命令搞定
docker pull docker.io/emqx/emqx

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx

–name 名字
1883:MQTT 协议端口
8883:MQTT/SSL 端口
8083:MQTT/WebSocket 端口
8081:HTTP API 端口
18083:Dashboard 管理控制台端口
-d 指定容器

然后你可以登录Broker的控制台,初始的账户 admin, 密码 public ,有了这个可视化界面,剩下的就很好操作了

http://localhost:18083/#/

在这里插入图片描述

  • Maven依赖
<dependencies>
   <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
       <version>1.2.5</version>
   </dependency>
</dependencies>

创建 MQTT 连接

  • MQTT 服务器
    可以使用 EMQX 提供的 免费公共 MQTT 服务器,该服务基于 EMQX 的 MQTT 云平台创建。
    本文使用docker本地服务器接入信息如下:

Broker: broker.emqx.io(免费中国用户可以使用 broker-cn.emqx.io)
TCP Port: 1883
SSL/TLS Port: 8883

  • 普通 TCP 连接
    设置 MQTT Broker 基本连接参数,用户名、密码为非必选参数。

String broker = “tcp://broker.emqx.io:1883”;
// TLS/SSL
// String broker = “ssl://broker.emqx.io:8883”;
String username = “emqx”;
String password = “public”;
String clientid = “publish_client”;

然后创建 MQTT 客户端并连接。

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);

说明

  • MqttClient: 同步调用客户端,使用阻塞方法通信。
  • MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息,使其能够传递到指定的 QoS。
  • MqttConnectOptions: 连接选项,用于指定连接的参数,下面列举一些常见的方法。
    • setUserName: 设置用户名
    • setPassword: 设置密码
    • setCleanSession: 设置是否清除会话
    • setKeepAliveInterval: 设置心跳间隔
    • setConnectionTimeout: 设置连接超时时间
    • setAutomaticReconnect: 设置是否自动重连

TLS/SSL 连接

如果要使用自签名证书进行 TLS/SSL 连接,需添加 bcpkix-jdk15on 到 pom.xml 文件。

<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<dependency>
   <groupId>org.bouncycastle</groupId>
   <artifactId>bcpkix-jdk15on</artifactId>
   <version>1.70</version>
</dependency>

然后使用如下代码创建 SSLUtils.java 文件。

package io.emqx.mqtt;

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class SSLUtils {
   public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                   final String crtFile, final String keyFile, final String password)
           throws Exception {
       Security.addProvider(new BouncyCastleProvider());

       // load CA certificate
       X509Certificate caCert = null;

       FileInputStream fis = new FileInputStream(caCrtFile);
       BufferedInputStream bis = new BufferedInputStream(fis);
       CertificateFactory cf = CertificateFactory.getInstance("X.509");

       while (bis.available() > 0) {
           caCert = (X509Certificate) cf.generateCertificate(bis);
      }

       // load client certificate
       bis = new BufferedInputStream(new FileInputStream(crtFile));
       X509Certificate cert = null;
       while (bis.available() > 0) {
           cert = (X509Certificate) cf.generateCertificate(bis);
      }

       // load client private key
       PEMParser pemParser = new PEMParser(new FileReader(keyFile));
       Object object = pemParser.readObject();
       JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
       KeyPair key = converter.getKeyPair((PEMKeyPair) object);
       pemParser.close();

       // CA certificate is used to authenticate server
       KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
       caKs.load(null, null);
       caKs.setCertificateEntry("ca-certificate", caCert);
       TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
       tmf.init(caKs);

       // client key and certificates are sent to server so it can authenticate
       KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
       ks.load(null, null);
       ks.setCertificateEntry("certificate", cert);
       ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
               new java.security.cert.Certificate[]{cert});
       KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
              .getDefaultAlgorithm());
       kmf.init(ks, password.toCharArray());

       // finally, create SSL socket factory
       SSLContext context = SSLContext.getInstance("TLSv1.2");
       context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

       return context.getSocketFactory();
  }
}

参照如下设置 options。

// 设置 SSL/TLS 连接地址
String broker = "ssl://127.0.0.1:8883";
// 设置 socket factory
String caFilePath = "/cacert.pem";
String clientCrtFilePath = "/client.pem";
String clientKeyFilePath = "/client.key";
SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
options.setSocketFactory(socketFactory);

发布 MQTT 消息

创建一个发布客户端类 PublishSample,该类将发布一条 Hello MQTT 消息至主题 mqtt/test。

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PublishSample {

   public static void main(String[] args) {

       String broker = "tcp://127.0.0.1:1883";
       String topic = "mqtt/test";
       String username = "admin";
       String password = "public";
       String clientid = "publish_client";
       String content = "Hello MQTT";
       int qos = 0;

       try {
           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
           // 连接参数
           MqttConnectOptions options = new MqttConnectOptions();
           // 设置用户名和密码
           options.setUserName(username);
           options.setPassword(password.toCharArray());
           options.setConnectionTimeout(60);
      options.setKeepAliveInterval(60);
           // 连接
           client.connect(options);
           // 创建消息并设置 QoS
           MqttMessage message = new MqttMessage(content.getBytes());
           message.setQos(qos);
           // 发布消息
           client.publish(topic, message);
           System.out.println("Message published");
           System.out.println("topic: " + topic);
           System.out.println("message content: " + content);
           // 关闭连接
           client.disconnect();
           // 关闭客户端
           client.close();
      } catch (MqttException e) {
           throw new RuntimeException(e);
      }
  }
}

订阅 MQTT 主题

创建一个订阅客户端类 SubscribeSample,该类将订阅主题 mqtt/test。

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SubscribeSample {
   public static void main(String[] args) {
       String broker = "tcp://127.0.0.1:1883";
       String topic = "mqtt/test";
       String username = "admin";
       String password = "public";
       String clientid = "subscribe_client";
       int qos = 0;

       try {
           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
           // 连接参数
           MqttConnectOptions options = new MqttConnectOptions();
           options.setUserName(username);
           options.setPassword(password.toCharArray());
           options.setConnectionTimeout(60);
      options.setKeepAliveInterval(60);
           // 设置回调
           client.setCallback(new MqttCallback() {

               public void connectionLost(Throwable cause) {
                   System.out.println("connectionLost: " + cause.getMessage());
              }

               public void messageArrived(String topic, MqttMessage message) {
                   System.out.println("topic: " + topic);
                   System.out.println("Qos: " + message.getQos());
                   System.out.println("message content: " + new String(message.getPayload()));

              }

               public void deliveryComplete(IMqttDeliveryToken token) {
                   System.out.println("deliveryComplete---------" + token.isComplete());
              }

          });
           client.connect(options);
           client.subscribe(topic, qos);
      } catch (Exception e) {
           e.printStackTrace();
      }
  }
}
  • MqttCallback 说明:

connectionLost(Throwable cause): 连接丢失时被调用
messageArrived(String topic, MqttMessage message): 接收到消息时被调用
deliveryComplete(IMqttDeliveryToken token): 消息发送完成时被调用

测试

接下来运行 SubscribeSample,订阅 mqtt/test 主题。 然后运行 PublishSample,发布消息到 mqtt/test 主题。 我们将会看到发布端成功发布消息,同时订阅端接收到消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/742172.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在vite创建的vue3项目中使用Cesium标记地点(基于加载建筑样式,划分区域)

在vite创建的vue3项目中使用Cesium标记地点&#xff08;基于加载建筑样式&#xff0c;划分区域&#xff09; 使用vite创建vue3项目 npm create vitelatestcd到创建的项目文件夹中 npm install安装Cesium npm i cesium vite-plugin-cesium vite -D配置 vite.config.js文件&#…

通过平均列比较两组迭代次数

( A, B )---3*30*2---( 1, 0 )( 0, 1 ) 让网络的输入只有3个节点&#xff0c;AB训练集各由6张二值化的图片组成&#xff0c;让差值结构中有6个1, 行分布是0&#xff0c;1&#xff0c;1&#xff0c;1&#xff0c;1&#xff0c;2列分布是2&#xff0c;2&#xff0c;2.统计迭代次…

Java-多线程编程——基础篇及相关面试题

这里写目录标题 一、前言二、进程与线程的基本概念三、为什么Java中引入多线程&#xff1f;3.1 并行处理3.2 提高性能3.3 提高响应能力3.4 资源共享3.5 异步编程 四、Java多线程-创建多线程的类和接口4.1 Thread类4.2 Runnable接口 五、示例代码5.1 使用Thread类创建多线程六、…

【Django学习】(十二)GenericAPIView_过滤_排序_分页

上篇文章初步接触了GenericAPIView&#xff0c;这次来更加深入的学习它&#xff0c;了解里面的一些使用和方法 get_object&#xff1a;源码中&#xff1a;处理查询集&#xff0c;并含有所需要得pk值,lookup_fieldget_queryset&#xff1a;源码中&#xff1a;先判断queryset是否…

全志F1C200S嵌入式驱动开发(linux移植)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 前面完成了uboot移植,下面就要进行linux移植。当然,理论上uboot只是为后续的os准备好了一个基础运行环境,实际运行的操作系统是不是选择linux,也不一定。如果为了实际生产的需要…

Golang环境搭建指南(Windows和linux)

前言&#xff1a; go语言和Java&#xff0c;Python&#xff0c;C语言等等基本一样&#xff0c;也是需要在系统内集成语言环境的。语言基本都一样&#xff0c;支持各种系统架构&#xff0c;比如&#xff0c;mac&#xff0c;Windows&#xff0c;linux系统支持。本文仅以最为常用…

Django_Paginator分页器

目录 分页器代码说明 简单demo 源码等资料获取方法 分页器代码说明 import os import random # 需要导入分页器类from django.core.paginator import Paginator, EmptyPage# 导入配置django配置文件 os.environ.setdefault(DJANGO_SETTINGS_MODULE, dailyfresh.settings)it…

Java的Hibernate框架中集合类数据结构的映射编写教程

Java的Hibernate框架中集合类数据结构的映射编写教程 一、集合映射 1.集合小介 集合映射也是基本的映射&#xff0c;但在开发过程中不会经常用到&#xff0c;所以不需要深刻了解&#xff0c;只需要理解基本的使用方法即可&#xff0c;等在开发过程中遇到了这种问题时能够查询…

《机器学习公式推导与代码实现》chapter5-线性判别分析LDA

《机器学习公式推导与代码实现》学习笔记&#xff0c;记录一下自己的学习过程&#xff0c;详细的内容请大家购买作者的书籍查阅。 线性判别分析 线性判别分析(linear discriminant analysis, LDA)是一种经典的线性分类方法&#xff0c;其基本思想是将数据投影到低维空间&…

openGauss学习笔记-06 openGauss 基本概念

文章目录 openGauss学习笔记-06 openGauss 基本概念6.1 数据库&#xff08;Database&#xff09;6.2 数据块&#xff08;Block&#xff09;6.3 行&#xff08;Row&#xff09;6.4 列&#xff08;Cloumn&#xff09;6.5 表&#xff08;Table&#xff09;6.6 数据文件&#xff08…

Opencv之角点 Harris、Shi-Tomasi 检测详解

角点&#xff0c;即图像中某些属性较为突出的像素点 常用的角点有以下几种&#xff1a; 梯度最大值对应的像素点两条直线或者曲线的交点一阶梯度的导数最大值和梯度方向变化率最大的像素点一阶导数值最大&#xff0c;但是二阶导数值为0的像素点 API简介&#xff1a; void c…

Go语言网络编程:HTTP服务端之底层原理与源码分析——http.HandleFunc()、http.ListenAndServe()

一、启动 http 服务 import ("net/http" ) func main() {http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {w.Write([]byte("ping...ping..."))})http.ListenAndServe(":8999", nil) }在 Golang只需要几行代…

MySQL存储过程和存储函数练习

创建表并插入数据 字段名 数据类型 主键 外键 非空 唯一 自增 id INT 是 否 是 是 否 name VARCHAR(50) 否 否 是 否 否 glass VARCHAR(50) 否 否 是 否 否 sch 表内容 id name glass 1 xiaommg glass 1 2 xiaojun glass 2 1、创建一个可以统计表格内记录条数的存储函数 &#…

耳夹式骨传导耳机哪个牌子好?耳夹骨传导耳机推荐

骨传导耳机品牌越来越多&#xff0c;选择骨传导耳机时可不是一件简单的事&#xff0c;在挑选的时候首先需要考虑到耳机自身的综合性能&#xff0c;以及耳机的配置如何都会影响到我们使用耳机的幸福感&#xff0c;接下来我来给大家挑选几款目前口碑不错的耳夹式骨传导耳机&#…

windows下使用cd命令切换到D盘的方法

windows下使用cd命令切换到D盘的方法 winr输入cmd进入终端

【CANFD详细介绍与CAN区别】

在汽车领域&#xff0c;随着人们对数据传输带宽要求的增加&#xff0c;传统的CAN总线由于带宽的限制难以满足这 种增加的需求。此外为了缩小CAN网络&#xff08;max. 1MBit/s&#xff09;与FlexRay(max.10MBit/s)网络的带宽差距&#xff0c;BOSCH公司推出了CAN FD。 CAN FD&…

基于控制屏障函数的安全关键系统二次规划(适用于ACC)(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 基于控制屏障函数的安全关键系统二次规划&#xff08;适用于ACC&#xff09;是一种用于自适应巡航控制&#xff08;ACC&#x…

Servlet文档2

servlet文档2 HttpServletRequest 获取请求头API getMethod()获取请求的方式getRequestURI()获取请求的uri&#xff08;相对路径&#xff09;getRequestURL()获取请求的url&#xff08;绝对路径&#xff09;getRemoteAddr()获取请求的地址getProtocol()获取请求的协议getRem…

Vue3 CSS v-bind 计算和三元运算

官方文档 中指出&#xff1a;CSS 中的 v-bind 支持 JavaScript 表达式&#xff0c;但需要用引号包裹起来&#xff1a; 例子如下&#xff1a; <script lang"ts" setup> const treeContentWidth ref(140); </script><style lang"less" scop…

mschart Label Formart显示数值的格式化

默认这个数值想显示2位小数&#xff0c; 格式化代码如下。 series1.Label "#VAL{###.###}";