MQTT+Disruptor 提高物联网高并发

news2024/11/5 10:02:38

基于springboot2.5.7

废话不多说,直接上干货:

@Slf4j
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@IntegrationComponentScan(basePackages = {"扫描包路径","扫描包路径"})
public class MqttAutoConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private DisruptorProperties disruptorProperties;

    @RefreshScope
    @Bean(value = "mqttParallelQueueHandler",initMethod = "start",destroyMethod = "shutDown")
    @ConditionalOnProperty(prefix = "custom-config.mqtt", name = "disruptor", havingValue = "true")
    public ParallelQueueHandler mqttParallelQueueHandler(){
        log.info("初始化Disruptor...");
        return new ParallelQueueHandler.Builder<DisruptorEventData>()
                .setDisruptorProperties(disruptorProperties)
                .setWaitStrategy(new BlockingWaitStrategy())
                .setListener(new MQTTMsgListener())
                .build();
    }

    @Bean
    @ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})
    public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        List<String> hostList = Arrays.asList(mqttProperties.getHostUrl().trim().split(","));
        String[] serverURIs = new String[hostList.size()];
        hostList.toArray(serverURIs);
        mqttConnectOptions.setServerURIs(serverURIs);
        mqttConnectOptions.setKeepAliveInterval(2);
        mqttConnectOptions.setAutomaticReconnect(true);
        return mqttConnectOptions;
    }

    /**
     *  MQTT 连接工厂
     * @return MqttPahoClientFactory
     */
    @Bean
    @ConditionalOnMissingBean
    public MqttPahoClientFactory receiverMqttClientFactoryForSub(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        log.info("【MQTT】-初始化连接工厂...");
        return factory;
    }

    /**
     * 出站通道
     * @return MessageChannel
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     *  MQTT 消息发送处理器
     * @return MessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                mqttProperties.getClientId()+"out", factory);

        messageHandler.setDefaultQos(1);
        //开启异步
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("test");
        return messageHandler;
    }


    /**
     * 此处可以使用其他消息通道
     * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。
     *
     * @return MessageChannel
     */
    @Bean
    public MessageChannel mqttInBoundChannel() {
        return new DirectChannel();
    }

    /**
     * 适配器, 多个topic共用一个adapter
     * 客户端作为消费者,订阅主题,消费消息
     */
    @Bean
    @ConditionalOnMissingBean
    public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory factory) {
        List<String> topics = mqttProperties.getSubscribeTopics();
        String[] topicArray = new String[topics.size()];
        for (int i = 0; i < topics.size(); i++) {
            topicArray[i] = "$queue/"+ topics.get(i);
        }
        log.info("【MQTT】-订阅TOPIC:{}", Arrays.toString(topicArray));
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId(), factory, topicArray);
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setRecoveryInterval(10000);
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInBoundChannel());
        return adapter;
    }

    @Autowired(required = false)
    @Qualifier("mqttParallelQueueHandler")
    private ParallelQueueHandler<DisruptorEventData> parallelQueueHandler;
    /**
     * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。
     * @return MessageHandler
     */
    @Bean
    @RefreshScope
    @ServiceActivator(inputChannel = "mqttInBoundChannel")
    public MessageHandler mqttMessageHandler() {
        // 获取配置中的设备品牌
        MqttProperties.DeviceBrand deviceBrand = mqttProperties.getDeviceBrand();
        boolean disruptor = mqttProperties.isDisruptor();
        // 获取所有实现了 CustomMqttMessageHandler 接口的 Bean
        return message -> {
            log.info("【MQTT】-收到MQTT消息,Topic: {}, Payload: {}",
                    message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
                    message.getPayload());
            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            Map<String, CustomMqttMessageReceiverHandler> handlers = applicationContext.getBeansOfType(CustomMqttMessageReceiverHandler.class);

            boolean handled = false;
            if (Objects.nonNull(deviceBrand)){
                CustomMqttMessageReceiverHandler handler = handlers.get(deviceBrand.getServiceName());
                if (Objects.nonNull(handler)){
                    if (handler.supportsTopic(topic)) {
                        handled = this.run(handler,message,topic,disruptor);
                    }
                }else {
                    log.error("【MQTT】-未找到设备品牌消息接收处理器,deviceBrand->{}",deviceBrand);
                }
            }else {
                for (CustomMqttMessageReceiverHandler handler : handlers.values()) {
                    if (handler.supportsTopic(topic)) {
                        handled = this.run(handler,message,topic,disruptor);
                    }
                }
            }
            if (!handled) {
                log.warn("【MQTT】-未找到匹配的处理器来处理Topic {} 的消息", topic);
            }
        };
    }

    @Bean
    @ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})
    public MqttMessageSender mqttMessageSender(){
        return new MqttMessageSender();
    }

    private boolean run(CustomMqttMessageReceiverHandler handler,Message<?> message,String topic,boolean disruptor){
        try {
            String traceId = MDC.get("traceId");
            if (!StringUtils.hasText(traceId)){
                traceId = UUID.randomUUID().toString().replaceAll("-", "");
                MDC.put("traceId",traceId);
            }
            if (disruptor && Objects.nonNull(parallelQueueHandler)){
                log.info("【MQTT】-使用Disruptor处理...");
                DisruptorEventData data = new DisruptorEventData();
                Map<String,Object> map = new HashMap<>();
                map.put("data",message);
                map.put("handler",handler);
                map.put("traceId",traceId);
                data.setMessage(map);
                parallelQueueHandler.add(data);
            }else {
                handler.handleMessage(message);
            }
            return true;
        } catch (Exception e) {
            log.error("【MQTT】-Handler {} 处理Topic {} 的消息时出错", handler.getClass().getSimpleName(), topic, e);
            return false;
        }finally {
            MDC.clear();
        }
    }

由于涉及隐私,其余代码可以留言

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2232495.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

闯关leetcode——3289. The Two Sneaky Numbers of Digitville

大纲 题目地址内容 解题代码地址 题目 地址 https://leetcode.com/problems/the-two-sneaky-numbers-of-digitville/description/ 内容 In the town of Digitville, there was a list of numbers called nums containing integers from 0 to n - 1. Each number was suppos…

#Jest进阶知识:整合 webpack 综合练习

这一小节&#xff0c;我们来做一个综合的练习&#xff0c;该练习会整合&#xff1a; typescriptwebpackjest 准备工作 首先创建项目目录&#xff0c;通过 npm init -y 进行初始化。 整个项目我们打算使用 typescript 进行开发&#xff0c;因此需要安装 typescript npm i t…

MATLAB——矩阵操作

内容源于b站清风数学建模 数学建模清风老师《MATLAB教程新手入门篇》https://www.bilibili.com/video/BV1dN4y1Q7Kt/ 目录 1.MATLAB中的向量 1.1向量创建方法 1.2向量元素的引用 1.3向量元素修改和删除 2.MATLAB矩阵操作 2.1矩阵创建方法 2.2矩阵元素的引用 2.3矩阵…

原生鸿蒙应用市场开发者服务的技术解析:从集成到应用发布的完整体验

文章目录 引言一、鸿蒙原生应用的高效开发二、用户隐私保护&#xff1a;安全访问管理三、开发者实用工具&#xff1a;应用分析与A/B测试四、应用审核与分发&#xff1a;快速上线4.1 应用加密&#xff1a;保护代码安全4.2 自动化测试与检测前移&#xff1a;提升应用质量 五、结语…

基于SSM+微信小程序的社团登录管理系统(社团1)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 2、项目技术 3、开发环境 4、功能介绍 1、项目介绍 基于SSM微信小程序的社团登录管理系统实现了管理员及社团、用户。 1、管理员实现了首页、用户管理、社团管理、社团信息管理、社…

虚拟化环境中的精简版 Android 操作系统 Microdroid

随着移动设备的普及和应用场景的多样化&#xff0c;安全性和隐私保护成为了移动操作系统的重要课题。Google推出的Microdroid&#xff0c;是一个专为虚拟化环境设计的精简版Android操作系统&#xff0c;旨在提供一个安全、隔离的执行环境。本文将详细介绍Microdroid的架构、功能…

手动搭建 Java Web 环境

操作场景 本文档介绍如何在 Linux 操作系统的腾讯云云服务器&#xff08;CVM&#xff09;上手动搭建 Java Web 环境。 进行手动搭建 Java Web 环境&#xff0c;您需要熟悉 Linux 命令&#xff0c;例如 CentOS 环境下通过 YUM 安装软件 等常用命令&#xff0c;并对所安装软件使…

WPF+MVVM案例实战与特效(二十四)- 粒子字体效果实现

文章目录 1、案例效果2、案例实现1、文件创建2.代码实现3、界面与功能代码3、总结1、案例效果 提示:这里可以添加本文要记录的大概内容: 2、案例实现 1、文件创建 打开 Wpf_Examples 项目,在 Views 文件夹下创建窗体界面 ParticleWindow.xaml,在 Models 文件夹下创建粒子…

「Mac畅玩鸿蒙与硬件18」鸿蒙UI组件篇8 - 高级动画效果与缓动控制

高级动画可以显著提升用户体验&#xff0c;为应用界面带来更流畅的视觉效果。本篇将深入介绍鸿蒙框架的高级动画&#xff0c;包括弹性动画、透明度渐变和旋转缩放组合动画等示例。 关键词 高级动画弹性缓动自动动画缓动曲线 一、Animation 组件的高级缓动曲线 缓动曲线&#…

Golang--数组、切片、映射

1、数组 1.1 数组类型 var 数组名 [数组大小]数据类型 package main import "fmt"func main(){//1、定义一个数组var arr1 [5]intarr1[0] 100arr1[1] 200fmt.Println(arr1) //[100 200 0 0 0] } 1.2 数组的初始化方式 package main import "fmt" func …

Android音频进阶之PCM设备创建(九十三)

简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【原创干货持续更新中……】🚀 优质视频课程:AAOS车载系统+…

【已解决】C# NPOI如何设置单元格格式

前言 设置单元格格式我们做表格必须要的一步&#xff0c;那么如何对单元格进行设置呢&#xff1f;直接上图看看效果图先&#xff0c;我做的是一个居中然后字体变化的操作&#xff0c;其他的查他的手册即可。 解决方法 直接上代码 IWorkbook excelDoc new XSSFWorkbook();…

系统学习算法:专题一 双指针

题目一&#xff1a; 算法原理&#xff1a; 首先我们可以对这道题目进行题目分类&#xff0c;像这种对数组以某种标准而进行一定的划分的题目&#xff0c;我们统称为数组分块问题&#xff0c;其中使用到的算法就是双指针算法&#xff0c;这里的指针并非真正int*这种&#xff0c…

Python异常检测 - LSTM(长短期记忆网络)

系列文章目录 Python异常检测- Isolation Forest&#xff08;孤立森林&#xff09; python异常检测 - 随机离群选择Stochastic Outlier Selection (SOS) python异常检测-局部异常因子&#xff08;LOF&#xff09;算法 Python异常检测- DBSCAN Python异常检测- 单类支持向量机(…

【双指针】【数之和】 LeetCode 633.平方数之和

算法思想&#xff1a; 双指针枚举i,j&#xff1b;类似三数之和 class Solution { public:bool judgeSquareSum(int c) {long long sum0;vector<int> dp;dp.push_back(0);long long start1;while(sum < c){sum start *start;if(sum>c) break;else dp.push_back(…

前端Nginx的安装与应用

目录 一、前端跨域方式 1.1、CORS(跨域资源共享) 1.2、JSONP(已过时) 1.3、WebSocket 1.4、PostMessage 1.5、Nginx 二、安装 三、应用 四、命令 4.1、基本操作命令 4.2、nginx.conf介绍 4.2.1、location模块 4.2.2、反向代理配置 4.2.3、负载均衡模块 4.2.4、通…

Openlayers高级交互(18/20):根据feature,将图形适配到最可视化窗口

本示例的目的是介绍如何在vue+openlayers中使用extent,使用feature fit的方式来适配窗口。当加载到页面上几个图形要充分展示在窗口的时候,可以用这种方式来平铺到页面中。 效果图 专栏名称内容介绍Openlayers基础实战 (72篇)专栏提供73篇文章,为小白群体提供基础知识及示…

每日OJ题_牛客_相差不超过k的最多数_滑动窗口_C++_Java

目录 牛客_相差不超过k的最多数_滑动窗口 题目解析 C代码 Java代码 牛客_相差不超过k的最多数_滑动窗口 相差不超过k的最多数_牛客题霸_牛客网 (nowcoder.com) 描述&#xff1a; 给定一个数组&#xff0c;选择一些数&#xff0c;要求选择的数中任意两数差的绝对值不超过 …

初始JavaEE篇——多线程(5):生产者-消费者模型、阻塞队列

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;JavaEE 文章目录 阻塞队列生产者—消费者模型生产者—消费者模型的优势&#xff1a;生产者—消费者模型的劣势&#xff1a; Java标准库中的阻…

后端eclipse——文字样式:UEditor富文本编辑器引入

目录 1.富文本编辑器的优点 2.文件的准备 3.文件的导入 导入到项目&#xff1a; 导入到html文件&#xff1a; ​编辑 4.富文本编辑器的使用 1.富文本编辑器的优点 我们从前端写入数据库时&#xff0c;文字的样式具有局限性&#xff0c;不能存在换行&#xff0c;更改字体…