【物联网】使用RabbitMQ作为MQTT服务端并自定义设备连接权限

news2025/1/11 23:38:28

文章目录

  • 项目背景
  • 一、部署RabbiqMQ
  • 二、设备连接鉴权
    • 1.开启插件
    • 2.修改配置
    • 3.连接鉴权
    • 4.消息鉴权
  • 总结


项目背景

最近公司启动了一个新的物联网项目,使用MQTT协议与设备通信,在比较了各大MQTT服务后,决定选用开源的RabbitMQ搭建我们的服务端。我们的目标是能够支撑10万台设备同时在线,因此比较看重集群和高可用功能,RabbitMQ在这方面十分优异,同时RabbitMQ也能够兼顾项目中的消息中间件功能,缺点是仅支持3.1.1版本的协议,但对于我们这个项目来说够用。
在设计初期考虑给每一条通讯信息加密来保证安全性,但考虑到10万台设备并发量巨大,每一条消息都加解密会导致服务器计算压力过大,因此决定在设备连接、发布消息以及监听主题时来控制权限。


一、部署RabbiqMQ

在实际生产中我们部署的是集群,在本文简化为单机模式。建议通过RPM安装RabbiqMQ,具体方法不是本文重点,可以参考这篇文章。

在安装成功后需要开启MQTT插件:

rabbitmq-plugins enable rabbitmq_mqtt

在安装RabbitMQ后默认只有guest用户,我们一般会创建一个admin用户,使用MQTTX这个工具来测试我们的MQTT服务端是否工作正常,使用admin用户连接,注意将MQTT版本改为3.1.1:
在这里插入图片描述
由于本文的重点在于设备自定义鉴权,因此服务搭建过程较为简略,如果遇到问题无法连接,请参考其他文章逐步排查。

《使用RabbitMQ搭建MQTT服务》

二、设备连接鉴权

1.开启插件

想让RabbiqMQ走我们自定义的鉴权接口,需要先开启 rabbitmq_auth_backend_http 插件,同时该插件还推荐配合 rabbitmq_auth_backend_cache 通过缓存减轻授权认证服务器压力。

rabbitmq-plugins enable rabbitmq_auth_backend_http
rabbitmq-plugins enable rabbitmq_auth_backend_cache

另外使用 rabbitmq-plugins list 命令可以查看已经开启的插件和版本。

2.修改配置

然后我们可以去下载官方示例,打开示例项目的 AuthBackendHttpController 类,并将默认用户从guest改为admin:

    private final Map<String, User> users = new HashMap<String, User>() {{
        put("admin", new User("admin", "admin", asList("administrator", "management")));
        put("springy", new User("springy", "springy", asList("administrator", "management")));
    }};

然后启动服务,稍后我们会在官方示例的基础上进行鉴权逻辑的开发。

在服务启动成功后,我们需要到服务器上修改RabbitMQ的配置以告知我们的接口地址,使用RPM安装的配置文件应该在 /etc/rabbitmq/rabbitmq.config,如果没有也可以自己创建。
使用 rabbitmqctl environment 可以看到当前默认配置:

{rabbit,
     [{auth_backends,[rabbit_auth_backend_internal]},
      {auth_mechanisms,['PLAIN','AMQPLAIN']},
...
{rabbitmq_auth_backend_cache,
     [{cache_module,rabbit_auth_cache_ets},
      {cache_module_args,[]},
      {cache_refusals,false},
      {cache_ttl,15000},
      {cached_backend,rabbit_auth_backend_internal}]},
 {rabbitmq_auth_backend_http,
     [{http_method,get},
      {resource_path,"http://localhost:8000/auth/resource"},
      {topic_path,"http://localhost:8000/auth/topic"},
      {user_path,"http://localhost:8000/auth/user"},
      {vhost_path,"http://localhost:8000/auth/vhost"}]},
...

我们需要覆盖默认配置,在 rabbitmq.config 中找到 auth_backends 项,在数组中增加 rabbit_auth_backend_cache,把 rabbitmq_auth_backend_http 这一项中四个接口地址改为正确的地址(localhost改为你的IP)。
修改成功后重启服务:

service rabbitmq-server restart 

再次使用 rabbitmqctl environment 查看配置是否生效。

3.连接鉴权

此时再使用admin/admin账号通过MQTTX工具连接服务,因该能够看到接口打印的日志“认证通过”。

下面我们开始着手改造示例中的 user 方法,我们的设备都会有自己的设备名称(deviceName)以及产品ID(productId),每个产品又会有自己的密钥(productKey),其中每台设备的设备名称不允许重复,因此我们使用产品ID和设备名称结合作为连接的用户名({deviceName}@{productId}),密码则使用上述三个字段做MD5生成。

注:实际生产中密码的生成规则会更加复杂,此处仅作示例。

假定上述三个字段分别如下:

        String deviceName = "87654321";
        String productId = "123456";
        String productKey = "abcdef";

执行代码:

        String password = Md5Utils.hash(deviceName + productId + productKey);
        System.out.println(password);

输出结果为:5b0e93055c3bf7db0fbc1eb19f2a3777

综上所述我们期望设备连接的正确用户名为:

87654321@123456

连接密码为:

5b0e93055c3bf7db0fbc1eb19f2a3777

改造后的完整代码如下:

    private final static String DEFAULT_USERNAME = "admin";

    private final static String DEFAULT_PASSWORD = "admin";

    private final static String ALLOW = "allow";

    private final static String DENY = "deny";

    private final static String PRODUCT_KEY = "abcdef";

    private final Map<String, User> users = new HashMap<String, User>() {{
        put(DEFAULT_USERNAME, new User(DEFAULT_USERNAME, DEFAULT_PASSWORD, asList("administrator", "management")));
        put("springy", new User("springy", "springy", asList("administrator", "management")));
    }};

    @RequestMapping("user")
    public String user(@RequestParam("username") String username,
                       @RequestParam("password") String password) {
        LOGGER.info("Trying to authenticate user {} password {}", username, password);
        User user = users.get(username);
        //系统默认用户直接放过,用于服务端访问及测试
        if (user != null && user.getPassword().equals(password)) {
            LOGGER.info("认证通过");
            return "allow " + collectionToDelimitedString(user.getTags(), " ");
        } else {
            //设备(客户端)用户
            if (username.contains("@")) {
                String[] array = username.split("@");
                if (array.length < 2) {
                    return DENY;
                }
                String deviceName = array[0];
                String productId = array[1];
                //开始验证密码
                String myPassword = this.genPassword(deviceName, productId);
                if (myPassword.equals(password)) { //密码验证通过
                	LOGGER.info("用户{}认证通过", username);
                    //去数据库验证是否存在该产品,如果设备是先注册到平台才允许连接,那么还需要校验deviceName,具体代码省略
                    return this.checkProduct(deviceName, productId) ? ALLOW : DENY;
                } else {
                    LOGGER.warn("用户{}认证失败", username);
                    return DENY;
                }
            } else {
                return DENY;
            }
        }
    }
    
    private String genPassword(String deviceName, String productId) {
        return Md5Utils.hash(deviceName + productId + PRODUCT_KEY);
    }

重启服务,使用MQTTX进行连接测试,如果用户名或密码错误,将会弹出提示:

Error: Connection refused: Bad username or password

如果连接正常,则会打印日志:

用户87654321认证通过

仔细观察日志会发现vhost和resource方法也被调用了,由于我们只有一个默认vhost:“/”,因此没有必要验证,全部返回allow即可,根据我们的业务resource也不需要处理,返回allow。

4.消息鉴权

在改造代码前我们先简单说一说MQTT中的主题设计,当若干设备把消息发送到RabbitMQ后,RabbitMQ会根据订阅情况把消息转发给订阅者,假设我们所有设备都使用同一个主题发送消息,那么我们订阅者(服务端)只能通过解析报文后获取设备名称来区分消息的发送者,因此,任何一台设备都可以通过修改报文来模拟成其他任意一台设备,这属于一个潜在风险。

为了避免上述风险,我们希望能够限制设备连接到RabbitMQ后的行为,避免设备发送和获取自己权限外的数据,具体的方法就是每台设备使用自己的主题与订阅者(服务端)交互,MQTT协议中主题并不是预设的,是可以在运行时任意创建的,因此可以使用设备名称和产品ID动态创建主题。

我们假设设备上送消息的主题为:

topic/{deviceName}/{productId}/upload

服务端响应的主题为:

topic/{deviceName}/{productId}/upload/reply

下面我们着手改造topic方法,topic方法有一个参数TopicCheck类,其中对我们有用的属性是routing_key、username、permission。
设备发送消息到 topic/87654321/123456/upload 这个主题时,routing_key参数的值为 topic.87654321.123456.upload,把".“替换成”/"则可以还原主题。username是连接时的用户名,我们可以从中获取设备名称和产品ID,permission分为读和写,对应的是消息的监听和发送,这两个主题是不同的。

改造后代码如下:

//设备允许订阅的主题
    private static final List<String> deviceReadAllowTopic = Arrays.asList("topic/%s/%s/upload/reply")
    //设备允许发布的主题
    private static final List<String> deviceWriteAllowTopic = Arrays.asList("topic/%s/%s/upload")

    @RequestMapping("topic")
    public String topic(TopicCheck check) {
        LOGGER.info("校验topic={} ", check.getRouting_key());
        String username = check.getUsername();
        if (DEFAULT_USERNAME.equals(username)) { //默认管理员账户直接放过
            return ALLOW;
        }
        String permission = check.getPermission();
        String[] array = username.split("@");
        String deviceName = array[0];
        String productId = array[1];
        String routingKey = check.getRouting_key();
        String topic = routingKey.replaceAll("\\.", "/");
        if ("read".equals(permission)) {
            for (String s : deviceReadAllowTopic) {
                String f = String.format(s, deviceName, productId);
                if (f.equals(topic)) { //匹配上了则放过
                    return ALLOW;
                }
            }
            LOGGER.warn("设备{}订阅主题{}不在允许的列表内", username, topic);
            return DENY;
        } else if ("write".equals(permission)) {
            for (String s : deviceWriteAllowTopic) {
                String f = String.format(s, deviceName, productId);
                if (f.equals(topic)) { //匹配上了则放过
                    return ALLOW;
                }
            }
            LOGGER.warn("设备{}发布主题{}不在允许的列表内", username, topic);
            return DENY;
        } else {
            return ALLOW;
        }
    }

使用MQTTX测试,会看到如果尝试给权限范围外的主题发送消息或者订阅没有权限的主题则会断开连接。

总结

至此,对RabbitMQ的权限控制就完成了,同学们可以根据项目的实际情况来修改鉴权方式,如果设备数量较多,则务必使用缓存缓解并发压力。

源码下载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/687846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最专业的敏捷需求管理工具推荐

为了协助大家找到合适的需求管理工具&#xff0c;我们选择了国内外几款款工具作比对&#xff1a; Leangoo领歌敏捷工具 Jama Software Visure Requirements IBM DOORS Next ReqSuite RM ReQtest Xebrio Orcanos Helix RM SpiraTeam Accompa Innoslate Leangoo领歌…

Python学习——元组

一、元组的定义 这部分就没有增、删、改操作了&#xff0c;是因为元组是一个不可变序列&#xff0c;元组也是Python内置的数据结构之一。 补充&#xff1a;关于可变序列与不可变序列 可变序列是指可以对序列进行增、删、改的操作&#xff0c;对象地址不发生变化。常见的可变序列…

【Jvm】Java类加载机制是什么?

文章目录 一、目标&#xff1a;二、原理 &#xff08;类的加载过程及其最终产品&#xff09;三、过程&#xff08;类的生命周期&#xff09;3.1、加载3.2、校验3.3、准备3.4、解析3.5、初始化 四、类加载器五、双亲委派机制 一、目标&#xff1a; 什么是类的加载&#xff1f;类…

vue3.x+elementPlus+swiper+vuedraggable实现页面装修

前言 该实现代码依赖框架&#xff1a;vue3.xelementPlusswipervuedraggable&#xff0c;做好前期工作&#xff0c;可直接在下面的附件处点击下载链接来下载相关文件&#xff1b;文件中包括搜索/图文广告/滚动消息三个模块代码示例&#xff0c;其他组件实现思路相同&#xff0c…

APT 系列 (一):APT 筑基之反射

什么是反射&#xff1f; 简单来讲&#xff0c;反射就是&#xff1a;已知一个类&#xff0c;可以获取这个类的所有信息 一般情况下&#xff0c;根据面向对象封装原则&#xff0c;Java实体类的属性都是私有的&#xff0c;我们不能获取类中的属性。但我们可以根据反射&#xff0…

问题解决 |关于CUDA的代码错误总结以及解决方法

本博客主要关于常见的CUDA的代码错误总结以及解决方法~ 1.RuntimeError运行错误 1.1.RuntimeError: CUDA error: out of memory CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect. For debugging cons…

Day09 Python面向对象和异常详解

文章目录 第六章 Python面向对象6.1. 面向对象基础6.1.1. 面向对象与面向过程6.1.2. 类与对象6.1.3. 类的设计与对象的实例化6.1.4. 构造方法6.1.5. 魔术方法6.1.6. 类与类的关系使用到另一个类的对象完成需求使用到另一个类的对象作为属性 6.2. 面向对象进阶6.2.1. 封装6.2.1.…

开启AI原型设计新时代:数字创意的崭新前景

随着人工智能生成内容&#xff08;AIGC&#xff09;相关研究的突破&#xff0c;人类社会正面临一个全新的转折点。诸如多模态、可控扩散模型和大型语言模型等技术正在直接改变创意设计领域的生产过程。 在AIGC领域中&#xff0c;根据输入内容和输出形式的差异&#xff0c;我们…

【MySQl】索引及其B+树

目录 一、索引初识和测试数据的构建 二、磁盘 三、MySQL、OS、磁盘的交互方式&#xff08;InnoDB 存储引擎&#xff09; 四、MySQL中索引和page的理解 1、为什么MySQL和磁盘进行IO交互的时候&#xff0c;要采用page的方案进行交互&#xff0c;而不是采用用多少&#xff0c…

O2O跑腿快递可以解决哪些问题?

各大电商巨头都已经布局了O2O快递&#xff0c;就目前国内的快递环境而言&#xff0c;基本已经形成了四通一达局面&#xff0c;那么O2O同城配送目前有何痛点呢?下面小编就来为大家分析分析&#xff0c;感兴趣的朋友快来一起了解了解吧! 一、O2O快递目前存在哪些痛点? 我们国…

力扣题库刷题笔记14--最长公共前缀

1、题目如下&#xff1a; 2、个人Python代码实现 首先讲一下思路&#xff0c;通俗的来讲&#xff0c;就是依次比较字符串里面所有的字符&#xff0c;如果相同就是公共前缀&#xff0c;如果不同&#xff0c;后面就不用比较了&#xff0c;所以主要就是以下几点&#xff1a; 1、外…

[Leetcode] 0026. 删除有序数组中的重复项

26. 删除有序数组中的重复项 点击上方&#xff0c;跳转至Leetcode 题目描述 给你一个 升序排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。 由于在某些语…

好用到爆的数据库软件,还能兼容Excel,可以抛弃“VF”和Access

现在大部人已经不再用VF “VF”这个缩写&#xff0c;也只是停留在那个时代里&#xff0c;很多人已经不知道原来的样子&#xff0c;但有的人却还感慨万千。 懂得人自然都懂&#xff01; 微软的两款数据库软件&#xff1a;一个是office的Access&#xff0c;另一个就是“VF” VF…

自定义数据类型:结构体,枚举,联合

之前我们已经了解过结构体&#xff0c;这篇文章再来深入学习的一下&#xff0c;然后再学习其他自定义数据类型&#xff0c;枚举和联合 目录 1.结构体 1.1 结构体类型的声明 1.2 结构体的自引用 1.3 结构体变量的定义和初始化 1.4 结构体内存对齐 1.5 结构体传参 1.6 结…

【Shermo学习】使用shermo批量读入ORCA频率计算结果文件,并批量输出热力学校正数据

使用shermo批量读入ORCA频率计算结果文件&#xff0c;并批量输出热力学校正数据 安装与运行简单任务示例批量输出热力学校正数据 Shermo是北京科音自然科学研究中心卢天老师开发的一个程序&#xff0c;可以用来处理量子化学计算过程中的热力学数据。本文基于Shermo程序&#xf…

数据库设计篇-范式与反范式

概述 一般地&#xff0c;在进行数据库设计时&#xff0c;应遵循三大原则&#xff0c;也就是我们通常说的三大范式&#xff0c;即第一范式要求确保表中每列的原子性&#xff0c;也就是不可拆分&#xff1b;第二范式要求确保表中每列与主键相关&#xff0c;而不能只与主键的某部…

抖音林客生活服务商平台

抖音林客生活服务服务商平台是为了方便服务商管理自己的服务和订单而设计的平台。以下是其主要功能&#xff1a; 服务管理&#xff1a;服务商可以在平台上添加自己提供的服务&#xff0c;并设置服务的价格、规格等信息&#xff1b; 订单管理&#xff1a;服务商可以查看…

【Git总结】

第三章Git常用命令 Git注意首次 安装必须设置一下用户签名&#xff0c;否则无法提交代码。 vim 文件名&#xff08;hellow.txt&#xff09;//进入编辑模式 cat 文件名&#xff08;hellow.txt&#xff09;//查看文件内容 i进入编辑模式&#xff0c;(Esc):wq保存退出 &#…

掌握imgproc组件:opencv-图像处理

图像处理 1.线性滤波&#xff1a;方框滤波、均值滤波、高斯滤波1.1 平滑处理1.2 图像滤波与滤波器1.3 线性滤波器的简介1.4 滤波和模糊1.5 邻域算子与线性邻域滤波1.6 方框滤波1.7 均值滤波1.8 高斯滤波1.9 线性滤波综合案例 2.非线性滤波&#xff1a;中值滤波、双边滤波2.1 中…

python自定义序列类深入学习

一&#xff1a;自定义序列类 1、序列类型的分类 容器序列&#xff1a; list 、 tuple、deque 扁平序列&#xff1a; str 、bytes、bytearray、arry.array 可变序列&#xff1a; list 、deque、bytearray、array 不可变&#xff1a; str、tuple、bytes 容器序列表示可以放置任意…