MQTT broker搭建并用SSL加密

news2024/9/19 9:43:09

系统为centos,基于emqx搭建broker,流程参考官方。

安装好后,用ssl加密。

进入/etc/emqx/certs,可以看到
在这里插入图片描述
分别为

  • cacert.pem CA 文件
  • cert.pem 服务端证书
  • key.pem 服务端key
  • client-cert.pem 客户端证书
  • client-key.pem 客户端key
    编辑emqx配置:vim /etc/emqx/emqx.conf,添加ssl配置:
listeners.ssl.default {
  # 端口
  bind = "0.0.0.0:8883"
  ssl_options {
    cacertfile = "/etc/emqx/certs/cacert.pem" #CA文件
    certfile = "/etc/emqx/certs/cert.pem"    #服务端证书
    keyfile = "/etc/emqx/certs/key.pem"   #服务端key
    verify = verify_peer  # 双向认证
    fail_if_no_peer_cert = true
  }
}

再在客户端(如MQTTX)配置连接信息:在这里插入图片描述

Springboot订阅MQTTS

添加依赖

        <!--mqtt-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <!--ssl-->
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.64</version>
        </dependency>

配置

spring:
  mqtt:
    provider:
      #MQTTS服务地址,端口号默认8883,如果有多个,用逗号隔开
      url: ssl://192.168.1.xx:8883
      #用户名
      username: xx
      #密码
      password: xxxx
      #客户端id(不能重复)
      client:
        id: provider-id
      #MQTT默认的消息推送主题,实际可在调用接口是指定
      default:
        topic: topic
    consumer:
      url: ssl://192.168.1.xx:8883
      #用户名
      username: xx
      #密码
      password: xxxx
      #客户端id(不能重复)
      client:
        id: consumer-id
      #MQTT默认的消息推送主题,实际可在调用接口时指定
      default:
        topic: topic

Mqtt配置

@Configuration
public class MqttConsumerConfig {
    @Value("${spring.mqtt.consumer.username}")
    private String username;

    @Value("${spring.mqtt.consumer.password}")
    private String password;

    @Value("${spring.mqtt.consumer.url}")
    private String hostUrl;

    @Value("${spring.mqtt.consumer.client.id}")
    private String clientId;

    @Value("${spring.mqtt.consumer.default.topic}")
    private String defaultTopic;

	// 把证书文件放在在resource的ssl_certs目录下
    String caFilePath = "/ssl_certs/cacert.pem";
    String clientCrtFilePath = "/ssl_certs/client-cert.pem";
    String clientKeyFilePath = "/ssl_certs/client-key.pem";


    /**
     * 客户端对象
     */
    private MqttClient client;

    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init() {
        connect();
    }

    /**
     * 客户端连接服务端
     */
    @SneakyThrows
    public void connect() {
//        try {
            //创建MQTT客户端对象
            client = new MqttClient(hostUrl, clientId, new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();

            SSLSocketFactory socketFactory = getSocketFactory(caFilePath,
                    clientCrtFilePath, clientKeyFilePath, "");
            options.setSocketFactory(socketFactory);

            /*允许所有host连接*/
            options.setSSLHostnameVerifier((s, sslSession) -> true);

            /*不配置可能出现报错java.security.cert.CertificateException: No subject alternative names present*/
            options.setHttpsHostnameVerificationEnabled(false);


            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(username);
            //设置连接密码
            options.setPassword(password.toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);
            options.setAutomaticReconnect(true);
            //设置回调
            client.setCallback(new MqttCallbackImpl());
            client.connect(options);
            System.out.println(" 客户端连接成功 ");
            //订阅主题
            //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            int[] qos = {1, 1};
            //主题
            String[] topics = {"topicX#", "topicY#"};
            //订阅主题
//            client.subscribe("topicX",1);
            client.subscribe(topics);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
    }

    /**
     * 断开连接
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 订阅主题
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                     final String crtFile, final String keyFile, final String password)
            throws Exception {

        ClassPathResource caCrtFileRes = new ClassPathResource(caCrtFile);
        ClassPathResource crtFileRes = new ClassPathResource(crtFile);
        ClassPathResource keyFileRes = new ClassPathResource(keyFile);


        // add BouncyCastle provider
        Security.addProvider(new BouncyCastleProvider());

        // load CA certificate
        X509Certificate caCert = null;

        BufferedInputStream bis = new BufferedInputStream(caCrtFileRes.getStream());
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
            // System.out.println(caCert.toString());
        }

        // load client certificate
        bis = new BufferedInputStream(crtFileRes.getStream());
        X509Certificate cert = null;
        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
            // System.out.println(caCert.toString());
        }

        // load client private key
        PEMParser pemParser = new PEMParser(new InputStreamReader(keyFileRes.getStream()));
        Object object = pemParser.readObject();
        PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder()
                .build(password.toCharArray());
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter()
                .setProvider("BC");
        KeyPair key;
        if (object instanceof PEMEncryptedKeyPair) {
            System.out.println("Encrypted key - we will use provided password");
            key = converter.getKeyPair(((PEMEncryptedKeyPair) object)
                    .decryptKeyPair(decProv));
        } else {
            System.out.println("Unencrypted key - no password needed");
            key = converter.getKeyPair((PEMKeyPair) object);
        }
        pemParser.close();

        // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(caKs);

        // client key and certificates are sent to server so it can authenticate
        // us
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
                new java.security.cert.Certificate[]{cert});
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                .getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        // finally, create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        return context.getSocketFactory();
    }

}

@Slf4j
@Component
public class MqttCallbackImpl implements MqttCallback {

    @Override
    public void connectionLost(Throwable throwable) {
        log.info("[MQTT] 连接断开");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        log.info(String.format("接收消息主题 : %s", topic));
        log.info(String.format("接收消息Qos : %d", message.getQos()));
        log.info(String.format("接收消息内容 : %s", msg));
        log.info(String.format("接收消息retained : %b", message.isRetained()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发送消息成功");

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2112156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ANSA联合abaqus的转动副创建方式

下面链接详细介绍了ANSA联合Abaqus创建转动副的过程&#xff1a; https://www.bilibili.com/video/BV1cb421b7z9/?spm_id_from333.880.my_history.page.clickhttps://www.bilibili.com/video/BV1cb421b7z9/?spm_id_from333.880.my_history.page.click

复盘高质量Vision Pro沉浸式视频的制作流程与工具

在探索虚拟现实(VR)和增强现实(AR)技术的过程中,高质量的沉浸式体验是至关重要的。最近,国外开发者Dreamwieber在其作品中展示了如何使用一系列工具和技术,创造出令人震撼的Vision Pro沉浸式视频。本文将详细复盘Dreamwieber的工作流,希望能为从事相关领域的开发者们提…

综合评价 | 基于熵权-变异系数-博弈组合法的综合评价模型(Matlab)

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 根据信息熵的定义&#xff0c;对于某项指标&#xff0c;可以用熵值来判断某个指标的离散程度&#xff0c;其信息熵值越小&#xff0c;指标的离散程度越大&#xff0c; 该指标对综合评价的影响&#xff08;即权重&…

【JAVA入门】Day34 - Stream流

【JAVA入门】Day34 - Stream流 文章目录 【JAVA入门】Day34 - Stream流一、Stream 流的作用和使用步骤1.Stream流的创建&#xff0c;数据的添加2. Stream流的中间方法3. Stream流的终结方法 Stream 流有什么作用&#xff1f;我们看一个例子&#xff1a; 【练习】需求&#xff…

SQL的高级查询练习知识点下(day26)

1 学习目标 重点掌握分组查询的语法 重点掌握分页查询的语法 2 分页查询 2.1 语法 SELECT 字段|表达式,... FROM 表 [WHERE 条件] [GROUP BY 分组字段] [HAVING 条件] [ORDER BY 排序的字段] LIMIT [起始的条目索引,]条目数; 2.2 特点 起始条目索引从0开始 limit子句放在…

ARM32开发——GD32F4 DMA功能查询

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 DMA0DMA1 DMA0 DMA1

蔬菜识别数据集 蔬菜数据集 用于训练,有十种蔬菜,如图已经标注好的版本

数据集概述 该数据集包含十种常见的蔬菜&#xff1a;胡萝卜、包菜、水果辣椒、青瓜、南瓜、土豆、花菜和西红柿。数据集已经进行了精细的标注&#xff0c;适用于深度学习模型的训练&#xff0c;尤其是用于物体检测和分类任务。 数据集特点 种类多样&#xff1a;涵盖了八种蔬菜…

Github 2024-09-07Rust开源项目日报Top10

根据Github Trendings的统计,今日(2024-09-07统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Rust项目10CUE项目1Python项目1Go项目1Polars: Rust中的DataFrame接口和OLAP查询引擎 创建周期:1354 天开发语言:Rust, Python协议类型:MIT …

Vue-Pinia状态管理案列Demo

上一篇文章已经介绍了pinia的基本使用&#xff0c;现在做一个小的案列来巩固。 数据绑定修改pinia中的状态。 在页面刷新的时候会显示pinia中的数据 import { createApp } from vue // import ./style.css import App from ./App.vueimport { createPinia } from pinia cons…

心觉:接纳父母,就是接纳自己---创富第一步

Hi&#xff0c;我是心觉&#xff0c;与你一起玩转潜意识、脑波音乐和吸引力法则&#xff0c;轻松搞定人生挑战&#xff0c;实现心中梦想&#xff01; 挑战日更写作162/1000(完整记录在下面) 公门洞开纳百川 众心逐梦越千山 号召引领潜力绽 心觉潜意识无间 很多人抱怨父母&…

Linux是如何收发网络包的

Linux网 络协议栈 从上述⽹络协议栈&#xff0c;可以看出&#xff1a; 收发流程 ⽹卡是计算机⾥的⼀个硬件&#xff0c;专⻔负责接收和发送⽹络包&#xff0c;当⽹卡接收到⼀个⽹络包后&#xff0c;会通过 DMA 技术&#xff0c;将⽹络包放⼊到 Ring Buffer &#xff0c;这个是…

解决 Tomcat 启动时 JAR 包 `Invalid byte tag in constant pool` 异常问题

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…

未来出行:高效智能的汽车充电桩

解析高效智能的汽车充电桩的结构设计技术要求 充电桩按照充电方式分为交流充电桩与直流充电桩、交直流一体充电桩三种。直流充电桩一般安装在高速公路&#xff0c;充电站等地&#xff1b;交流充电桩一般安装在小区、停车场、道路停车位、高速公路服务区等位置。根据国网Q/GDW4…

[3.4]【机器人运动学MATLAB实战分析】PUMA560机器人正运动学MATLAB计算

PUMA560是六自由度关节型机器人,其6个关节都是转动副,属于6R型操作臂。各连杆坐标系如图1,连杆参数如表1所示。 图1 PUMA560机器人的各连杆坐标系 表1 PUMA560机器人的连杆参数 按D-H方法建立操作臂运动学方程。建立PUMA560机器人运动学方程的步骤如下࿱

【网络安全】Jenkins任意文件读取漏洞及检测工具(CVE-2024-23897)

原创文章,不得转载。 文章目录 漏洞成因影响范围检测工具更多细节漏洞成因 Jenkins CLI 接口存在任意文件读取漏洞(CVE-2024-23897)。该问题源于 args4j 库在解析文件名参数时,会将@符号后的字符串视为文件名并尝试读取文件,而且该功能默认处于启用状态。 影响范围 Jen…

部署Apache网站

简易部署自己的apache网站 写在前面&#xff1a;先安装好mysql&#xff0c;再来搭建站点 1.安装php [rootlocalhost ~]# yum install php -y ##安装了php&#xff0c;默认会和apache结合工作2.创建文件编写php网页代码 [rootlocalhost ~]# vim /var/www/html/index.php ##创…

1-6 图像覆盖掩膜 opencv树莓派4B 入门系列笔记

目录 一、提前准备 二、代码详解 hsv cv2.cvtColor(img, cv2.COLOR_BGR2HSV) lower_range np.array([101, 100, 100], dtypenp.uint8) upper_range np.array([121, 255, 255], dtypenp.uint8) mask cv2.inRange(hsv, lower_range, upper_range) mask2 cv2.inRange(…

【Python】数据可视化之分类图

目录 条形图 箱形图 散点图 分簇散点图 小提琴 分簇小提琴 条形图 条形图是一种直观的图表形式&#xff0c;它通过不同长度的矩形条&#xff08;即“条形”&#xff09;来展示数值变量的中心趋势估计值&#xff0c;其中每个矩形的高度直接对应于该组数据的某个中心量度&…

urdf ( xacro ) 的 collision碰撞参数设置

目录 写在前面的话整体流程1 URDF 文件结构2 查看原始碰撞形状描述3 加入简单碰撞形状描述方法一 Meshlab 自动测量方法二 人为测量 4 加入XACRO函数简化描述 最终结果展示侧视图正视图碰撞几何体中心点设置不对出现的结果 写在前面的话 本文使用的 URDF 文件是由 solidworks …

百度飞浆OCR半自动标注软件OCRLabel配置【详细

今天帮标注人员写了一份完整的百度飞浆OCR标注软件的安装配置说明书、以供标注人员使用 包括各种环境安装包一起分享出来【conda\python\label项目包、清华源配置文件、pycharm社区版安装包】 提取码&#xff1a;umys 1、解压并安装tools文件下的miniconda,建议安装在D盘下的…