SpringBoot Disruptor框架遇到的问题

news2025/4/6 6:26:23

1.消息重复消费问题

问题描述:
项目中启动了多个消费者,测试中发现同一条消息被多次消费。

解决方案:
①幂等方案处理
②disrutor提供了不同的处理机制:
自定义消费者实现EventHandler接口,他是属于重复消费,
自定义消费者实现WorkHandler接口,他是属于竞争消费。

重复消费:

/**
 * describe 消费者服务-邮件发送
 *
 * @author 一叶孤舟
 * @date 2022年03月17日17:41
 */
public class EmailEventHandler implements EventHandler<EmailEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmailEventHandler.class);

    @Override
    public void onEvent(EmailEvent event, long sequence, boolean endOfBatch) throws Exception {
        ....
    }

竞争消费:

/**
 * describe 消费者服务-邮件发送
 *
 * @author 一叶孤舟
 * @date 2022年03月17日17:41
 */
public class EmailEventHandler implements WorkHandler<EmailEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmailEventHandler.class);

    /**
     * 邮件发送
     * 根据主键更新文件发送列表数据状态
     *
     * @param event 邮件发送参数
     */
    @Override
    public void onEvent(EmailEvent event) {
    ...
    }

2.消费者线程一直处于java.lang.Thread.State: WAITING (parking)状态

问题描述:
使用了top命令(具体命令使用我在这篇文章介绍)查看测试环境服务器信息,发现CPU竟然600%。。。估计是要跑路的节奏了。。。还是说正事吧。。。使用top命令查看了服务器,发现是JAVA进程导致的,根据JAVA的进程id,top到了占用CPU的线程,jstack打印了堆栈信息,罪魁祸首就是自己写的disrutor的代码导致的,消费者线程一直parking,裂开了。认真审视了自己的代码,确实有问题,当然了这段代码不止这个问题,我太菜了。。。,已经在改正的路上了,要不然就得提桶跑路了。。。

问题代码:


/**
 * describe Disruptor高性能队列服务
 *
 * @author 晴日朗
 * @date 2022年03月17日18:48
 */
@Service
public class EmailSendDisruptorService implements IEmailSendDisruptorService {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmailSendDisruptorService.class);

    @Autowired
    private LookCupRepository lookCupRepository;

    @Override
    public Result emailSendDisruptorData(List<Map<String, Object>> mapParamList) throws Exception {
        ExecutorService executor = null;
        WorkerPool<EmailEvent> workerPool = null;

        try {
            LOGGER.info("EmailSendDisruptorService.emailSendDisruptorData.start.params={},startTime={}", JSON.toJSONString(mapParamList), DateUtils.getNowDate());
            if (CollectionUtils.isEmpty(mapParamList)) {
                return Result.errorJson(201, "发送参数为空");
            }
            // 1.创建可以缓存的线程池,提供发给consumer
            executor = ThreadPoolMonitor.newCachedThreadPool("EmailSendDisruptorServicePool");
            // 2.创建event工厂
            EmailEventFactory emailEventFactory = new EmailEventFactory();
            // 3.定义ringBuffer数组大小,设置为2的N次方,位运算,效率高
            int ringBufferSize = 1024 * 1024;
            // 4.创建ringBuffer
            RingBuffer<EmailEvent> ringBuffer = RingBuffer.create(ProducerType.MULTI, emailEventFactory, ringBufferSize, new YieldingWaitStrategy());
            SequenceBarrier barriers = ringBuffer.newBarrier();
            // 5.注册消费者,可注册多个,分摊消费模式  消费者实现的是WorkHandle接口
            // 查询快码表,获取消费者数目,默认是3个
            List<LookCupDO> lookCupDOS = lookCupRepository.findAllDataByType("EMAIL_CONSUMERS");
            int consumersCounts = CollectionUtils.isEmpty(lookCupDOS) ? 3 : StringUtils.isEmpty(lookCupDOS.get(0).getColumn1()) ? 3 : Integer.parseInt(lookCupDOS.get(0).getColumn1());
            EmailEventHandler[] consumers = new EmailEventHandler[consumersCounts];
            for (int i = 0; i < consumers.length; i++) {
                consumers[i] = new EmailEventHandler();
            }
            workerPool = new WorkerPool<EmailEvent>(ringBuffer, barriers,
                    new EventExceptionHandler(), consumers);
            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
            workerPool.start(executor);
            // 6.创建生产者
            EmailEventProducer producer = new EmailEventProducer(ringBuffer);
            // 7.业务参数投递
            for (int i = 0; i < mapParamList.size(); i++) {
                // 逐条发布,多线程并发处理
                producer.onData(mapParamList.get(i));
            }
        } catch (ParseException e) {
            throw new Exception(e);
        } finally {
            if (null != executor) {
                executor.shutdown();
            }
        }

        return Result.successJson(200, "操作成功", null);
    }
}

问题解决:

初步排查问题的根源在于配置的等待策略问题:YieldingWaitStrategy->修改为SleepingWaitStrategy。

介绍一下等待策略
①BlockingWaitStrategy
Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU 的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。

②SleepingWaitStrategy
SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。一般来说Linux系统会暂停一个线程约60µs,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费时间信号通知条件变量。但是,在 生产者线程和使用者线程之间移动事件的平均延迟会更高。它在不需要低延迟并且对生产线程的影响较小的情况最好。一个常见的用例是异步日志记录。

③YieldingWaitStrategy
YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy 将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排 队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

④BusySpinWaitStrategy
性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核 心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

3.消费者创建过多

问题描述:

我这消费者过多不是指我自定义的消费者数目太多,而是指的代码车祸现场导致消费者数目过多。。。实际场景中,多个场景注入触发这个接口,并发触发,线程池使用的是Executors.newFixedThreadPool(),创建了大量的线程(消费者),服务器卡顿,人当场裂开。。。我领导说我可以下班了。。。

问题解决:

定义复用的线程池,提供给消费者的线程复用,节省开销。

4.GC问题

问题描述:
我引入这个disruptor框架就是满足生产消费模式,需要处理的数据量很大,然后测试的过程中发现这个接口并发次数达到一定程度,页面就会卡堵。

问题解决:
①初步预计是GC影响的,后期调整VM堆分配机制,根据实际业务场景分配合理大小。
②业务测限制,只有当数据处理完了才能接受请求。

小结:

具体的调整和disruptor使用完整代码在另一篇文章:【超链接,待提供】,只要你不怕,可以看看我的代码,希望不再出现令人当场裂开的画面。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/109786.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SVN培训笔记(下拉项目、同步修改、添加文件、修改文件、删除文件、改名文件等)

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/128417196 红胖子(红模仿)的博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软…

【矩阵论】7. 范数理论——非负/正矩阵

7.5 非负/正矩阵 7.5.1 定义 a. 非负/正矩阵定义 一个实矩阵 A(aij)∈RmnA(a_{ij})\in R^{m\times n}A(aij​)∈Rmn 若对每一 iii 和 jjj &#xff0c;aij≥0a_{ij}\ge 0aij​≥0 &#xff0c;则称A是非负矩阵&#xff0c;A≥0A\ge 0A≥0 若对每一 iii 和 jjj &#xff0c;…

简单四则运算语法树可视化

简单四则运算语法树可视化 前几天有一篇博客是关于四则运算和二叉树的&#xff0c;我是把四则运算用二叉树写出来&#xff08;我是用的 JSON 的形式来存储和表达的&#xff09;&#xff0c;并计算最终的结果。最近&#xff0c;也在继续这个方面的东西&#xff0c;不过遇到一些…

uni-app 微信支付-小程序、APP、IOS

小程序 支付 先看官方文档 https://pay.weixin.qq.com/wiki/doc/apiv3/open/pay/chapter2_8_2.shtml 知晓有那些比不可少的流程&#xff0c;之后后端确定返回的参数值&#xff0c;用于前端支付。参数值必须一一对应&#xff0c;不然支付会失败 uni.requestPayment({timeStamp…

mysql5.7主从复制配置

写在最前面&#xff1a;一入编程深似海&#xff0c;从此对象变路人&#xff08;码农没时间谈恋爱&#xff09;。很长一段时间连写个文章的时间都没有了&#xff0c;学完后端、学前端&#xff0c;前端刚入门又要搞容器化&#xff0c;这真是“一重山外一重关&#xff0c;关关难过…

如何在Angular框架中更好地使用字体?一篇文章解答!

作为前端开发人员&#xff0c;在Angular JS中构建项目时&#xff0c;使用自定义字体可能会很棘手。有时候&#xff0c;如果开发者想要把选择的字体添加到项目中&#xff0c;将不得不把它导入到现有的代码中。 PS&#xff1a;Kendo UI致力于新的开发&#xff0c;来满足不断变化…

信息安全产品认证

文章目录一、引言二、《网络关键设备和网络安全专用产品安全认证证书》2.1 背景2.2 产品目录2.3 认证依据标准2.4 认证机构三、《中国国家信息安全产品认证证书》3.1 背景3.2 产品目录3.3 行业跟进四、《IT产品信息安全认证证书》五、CCC认证5.1背景5.2 中国强制性产品认证体系…

多源传感器组合导航 GNSS 视觉SLAM LiDAR INS 开源项目总结

多源传感器组合导航 GNSS 视觉SLAM LiDAR INS 开源项目总结 本文更改自 吴桐wutong 微信公众号文章。 开源代码总览 名称传感器类型组合类型滤波方法备注RTKLIBG-KFGAMP、rtklibexplorerhttps://www.rtklib.com/GPSTKG-KFhttps://github.com/SGL-UT/GPSTkBNCG-KFppp_wizardK…

【多个IP地址用逗号分割开】vue简单实现,textarea文本域输入多个ip地址用逗号分隔开,根据空格分割

前言 这个功能也是很多地方会用到的。 一般使用的地方是比如需要设置白名单或者黑名单 然后页面上会有一个textarea文本域。 在文本域中输入多个ip地址&#xff0c;输入一个回车换一行。 然后点击保存后&#xff0c;把数据通过逗号隔开的格式传给后端 后端再去拿到每一个ip地址…

电脑重装系统win11如何更改默认下载路径

win11如何更改默认下载路径&#xff1f;当大家平日里面&#xff0c;在使用win11系统的时候&#xff0c;如果觉得某一个下载路径的内存空间已经满了的话&#xff0c;那么就必须要及时更改&#xff0c;下面是小编提 供的更改路径的方法。 工具/原料&#xff1a; 系统版本&#x…

ES集群节点角色更换

背景 如何在一个3节点集群中&#xff0c;将Master/Data角色的节点中的数据分散到其他数据节点中&#xff0c;将该节点角色变更为Master 操作步骤 构建集群 集群角色如下 m-01:master/data d-02:data d-03:data 集群配置文件&#xff1a; Master节点elasticsearch.yml配置文件…

关于Maven中引用的jar的version配置为版本区间自动使用最新的版本,maven是如何判断哪个版本更加新?

背景 在Maven中&#xff0c;a工程引入了个jar包&#xff08;b工程&#xff09;&#xff0c;可以使用区间引入的方式&#xff0c;类似于数学区间的写法&#xff0c;如下 <dependency><groupId>org.example</groupId><artifactId>demo-jar</artifac…

使用Anaconda安装TensorFlow详细教程

一、Anaconda安装 可以参考笔者的这篇博客&#xff1a;Anaconda安装详细教程 二、准备工作 1、单击启动Anaconda Prompt创建新虚拟环境 2、在Anaconda Prompt依次执行以下命令conda create -n pytorch python3.6&#xff0c;创建名字为tensorflow的虚拟环境&#xff0c;再通…

Nacos学习笔记 (2)配置管理

1. 什么是配置中心 1.1 什么是配置 应用程序在启动和运行的时候往往需要读取一些配置信息&#xff0c;配置基本上伴随着应用程序的整个生命周期&#xff0c;比如&#xff1a;数据库连接参数、启动参数等。 配置主要有以下几个特点&#xff1a; &#xff08;1&#xff09;配…

LeetCode刷题复盘笔记—一文搞懂动态规划之674. 最长连续递增序列问题(动态规划系列第三十篇)

今日主要总结一下动态规划的一道题目&#xff0c;674. 最长连续递增序列 题目&#xff1a;674. 最长连续递增序列 Leetcode题目地址 题目描述&#xff1a; 给定一个未经排序的整数数组&#xff0c;找到最长且 连续递增的子序列&#xff0c;并返回该序列的长度。 连续递增的子…

怎么给视频加水印?

怎么给视频加水印&#xff1f;不管你是在网上下载的视频还是直接在网上观看视频&#xff0c;都能发现这些视频上往往都会有水印&#xff0c;有的水印可能是logo&#xff0c;有的水印可能是文字&#xff0c;这些水印不仅可以防止视频被别人盗取&#xff0c;还能很好的给自己做宣…

【JavaScript】js的websocket封装调用

WebSocket 是 HTML5 开始提供的一种浏览器与服务器间进行全双工通讯的网络技术。 WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455&#xff0c;WebSocketAPI 被 W3C 定为标准。 在 WebSocket API 中&#xff0c;浏览器和服务器只需要要做一个握手的动作&#xff0c;然…

4.triton c++使用

4.1 tritonclient c使用 4.2 triton c使用 4.3依赖安装 1.安装minconda Minconda是一个Anaconda的轻量级替代&#xff0c;默认只安装了python和conda&#xff0c;但可以通过pip和conda来安装所需要的包 1)下载 官网下载符合自己系统的版本Miniconda — conda documentation …

SpringBoot:模块探究之spring-boot-devtools

Spring Boot 使我们能够快速设置和运行服务。为了进一步增强开发体验&#xff0c;Spring 发布了 spring-boot-devtools 工具——作为 Spring Boot-1.3 的一部分 spring-boot-devtools 是 Spring Boot 提供的一组开发工具&#xff0c;可以提高开发者的工作效率&#xff0c;开发者…

软件设计中最关键的“开闭原则”,究竟指什么呢?

前言 软件设计原则中有一条很关键的原则是开闭原则&#xff0c;就是所谓的对扩展开放&#xff0c;对修改关闭。个人觉得这条原则是非常重要的&#xff0c;直接关系到你的设计是否具备良好的扩展性&#xff0c;但也是相对比较难以理解和掌握的&#xff0c;究竟怎样的代码改动才…