应用disruptor队列-ringBuffer环形缓冲器

news2025/1/11 19:46:02

一disruptor介绍

Disruptor是一个高性能的消息框架,其核心是基于环形缓冲区实现的。Disruptor的设计目标是尽可能地减少线程之间的竞争和同步,从而提高系统的吞吐量和响应速度。下面让我来介绍一下在使用Disruptor中如何优雅地使用环形队列。

首先,需要明确的是,Disruptor中的环形队列与普通的环形队列有所不同。Disruptor的环形队列并不是用于存储数据,而是用于协调读写操作的顺序。具体来说,当有多个消费者同时读取队列中的元素时,Disruptor会保证每个消费者只读取到它前面的元素,这样就避免了不必要的竞争和同步。
 

二场景应用定义不同类

1.公共的生产者CommonPublishEvent

@Component
public class CommonPublishEvent {

   /**
    * 发布
    *
    * @param event
    */
   public void publishEvent(MyApplicationEvent event, RingBuffer<DisruptorEvent> ringBuffer) {
      // 发布事件
      long sequence = ringBuffer.next();
      try {
         DisruptorEvent disruptorEvent = ringBuffer.get(sequence);
         disruptorEvent.setEvent(event);
      } finally {
         ringBuffer.publish(sequence);
      }
   }

}

2.父消费者CommonEventProcessor

public class CommonEventProcessor implements WorkHandler<DisruptorEvent> {

   @Override
   public void onEvent(DisruptorEvent event) throws Exception {
      // 判断不同类型--不同的类的名称,进行处理
      HandleEventProcessor handleEventProcessor = SpringUtil.getBean(event.getEvent().getEventType());
      handleEventProcessor.execute(event);
   }
}

3.消费者实现的接口HandleEventProcessor

public interface HandleEventProcessor {
   /**
    * 事件处理
    *
    * @param event
    */
   void execute(DisruptorEvent event);
}

4.消息事件类DisruptorEvent(用于disruptor传递的信息)

@Data
public class DisruptorEvent {
   private MyApplicationEvent event;
   
}

5.消息工厂MyEventFactory

public class MyEventFactory implements EventFactory<DisruptorEvent> {
   @Override
   public DisruptorEvent newInstance() {
      return new DisruptorEvent();
   }
}

6.配置类DisruptorConfiguration

@Configuration
@ConditionalOnWebApplication
@AutoConfigureAfter({WebServerTpAutoConfiguration.class})
public class DisruptorConfiguration {
   /**
    * 默认环形缓冲器容量
    */
   public static final int DEFAULT_RING_BUFFER_SIZE = 1048576;
   /**
    * 默认工作处理程序数量
    */
   public static final int DEFAULT_WORK_HANDLER_SIZE = 4;
   /**
    * 环形缓冲器容量
    * the size of the ring buffer, must be power of 2.
    * 必须是2的幂
    * 默认:2的20次方:1048576
    */
   @Value("${disruptor.ringBufferSize}")
   private String ringBufferSize;
   /**
    * 工作处理程序数量
    * 仿照环形队列设置为2的幂,
    * 默认:4
    */
   @Value("${disruptor.workHandlerSize}")
   private String workHandlerSize;
   /**
    * dynamicTP-线程池
    */
   @Resource
   private ThreadPoolExecutor disruptorExecutor;

   /**
    * Disruptor实例
    */
   @Bean
   @ConditionalOnBean({ThreadPoolExecutor.class})
   public Disruptor<DisruptorEvent> disruptor() {
      int ringSize = StringUtils.isBlank(ringBufferSize) ? DEFAULT_RING_BUFFER_SIZE : Integer.valueOf(ringBufferSize);
      Disruptor<DisruptorEvent> disruptor = new Disruptor<>(new MyEventFactory(), ringSize, disruptorExecutor, ProducerType.SINGLE, new BlockingWaitStrategy());
      log.info("Disruptor 已初始化");
      return disruptor;
   }

   /**
    * Disruptor事件队列—环形缓冲器
    *
    * @param disruptor Disruptor实例
    * @return
    */
   @Bean
   @ConditionalOnBean({Disruptor.class})
   public RingBuffer<DisruptorEvent> ringBuffer(Disruptor<DisruptorEvent> disruptor) {
      RingBuffer<DisruptorEvent> ringBuffer;

      try {
         int workSize = StringUtils.isBlank(workHandlerSize) ? DEFAULT_WORK_HANDLER_SIZE : Integer.valueOf(workHandlerSize);
         CommonEventProcessor[] processors = new CommonEventProcessor[workSize];
         for (int i = 0; i < workSize; i++) {
            processors[i] = new CommonEventProcessor();
         }
         // 注册工作处理程序
         disruptor.handleEventsWithWorkerPool(processors);

         // 启动队列,仅启动一次
         ringBuffer = disruptor.start();
         log.info("Disruptor 队列已开启");
         return ringBuffer;
      } catch (Exception ex) {
         log.error("Disruptor 队列开启异常", ex);
      }

      return null;
   }

}

7.监听器DisruptorEventListener

@Component
public class DisruptorEventListener implements ApplicationListener<MyApplicationEvent> {
   @Resource
   private RingBuffer<DisruptorEvent> ringBuffer;

   /**
    * 事件处理类
    */
   @Resource
   private CommonPublishEvent commonPublishEvent;

   /**
    * 事件监听
    *
    * @param event
    */
   @Override
   public void onApplicationEvent(MyApplicationEvent event) {
      commonPublishEvent.publishEvent(event, ringBuffer);
   }

}


监听器的使用:https://blog.csdn.net/m0_54355172/article/details/128592476

8应用实例

      系统启动后 监听器开始监听环形队列里的事件,一旦从系统别处生产触发时,放入环形队列中,然后监听器监听到后从队列中拿出,最后根据事件传过来的EventType(类名)执行不同的业务逻辑(消费者)。

9相关依赖

disruptor 

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

dynamicTP-线程池 

<dependency>
    <groupId>org.dromara.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-boot-starter-adapter-webserver</artifactId>
    <version>1.1.5</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1219283.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端学习笔记--TypeScript

1. typescript是什么 Typescript是由微软开发的一款开源的编程语言Typescript是Javascript的超集&#xff0c;遵循最新的ES5/ES6规范。TypeScript扩展了Javascript语法TypeScript更像后端Java、C#这样的面向对象语言可以让JS开发大型企业应用越来越多的项目是基于TS的&#xf…

喜讯 客户工艺线顺利通线

带你了解CiMEMS微纳制造工艺线 随着国内智能网联汽车、智能终端、可穿戴设备与消费电子的高速发展&#xff0c;以集成微纳系统&#xff08;Micro-electro-mechanical Systems&#xff0c;MEMS&#xff09;为代表的主要应用于激光雷达、汽车电子、环境感知与智能传感器的芯片&a…

释放搜索潜力:基于Milvus快速搭建语义检索系统(快速版),让信息尽在掌握

搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术细节以及项目实战(含码源) 专栏详细介绍:搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术…

利用IP地址查询优化保险理赔与业务风控的实用方法

随着数字化时代的到来&#xff0c;保险行业正逐渐采用先进的技术来改善理赔流程和强化业务风控。其中&#xff0c;通过IP地址查询成为一种有效的手段&#xff0c;为保险公司提供更精准的信息&#xff0c;以便更好地管理风险和提高服务效率。本文将探讨如何利用IP地址查询优化保…

立创EDA导出封装给PADS9.5使用

立创EDA导出封装给PADS9.5使用 前言 因为更换了新环境&#xff0c;需要使用PADS9.5进行电路设计&#xff0c;但是因为之前一直使用的是立创EDA、AD18&#xff0c;这会导致原先的元件库丢失&#xff0c;同时无法享受立创EDA丰富的封装库资源&#xff0c;因此记录一下如何将立创…

Docker-compose 下载安装测试完成

源文件-http://t.csdnimg.cn/7NxHchttp://t.csdnimg.cn/7NxHc 1 docker-compose说明 Docker Compose 是Docker的组装工具&#xff0c;用于创建和调试多个Docker容器&#xff0c;并在同一个Docker主机上运行它们。Docker Compose基于YAML文件&#xff0c;描述多个容器之间的相…

原论文一比一复现 | 更换 RT-DETR 主干网络为 【VGG13】【VGG16】【VGG19】| 对比实验必备

本专栏内容均为博主独家全网首发,未经授权,任何形式的复制、转载、洗稿或传播行为均属违法侵权行为,一经发现将采取法律手段维护合法权益。我们对所有未经授权传播行为保留追究责任的权利。请尊重原创,支持创作者的努力,共同维护网络知识产权。 论文地址:https://arxiv.o…

Wordpress页面生成器:Elementor 插件制作网站页面教程(图文完整)

本文来教大家怎么使用Wordpress Elementor页面编辑器插件来自由创建我们的网页内容。很多同学在面对建站的时候,一开始都是热血沸腾信心满满的,等到实际上手的时候就会发现有很多问题都是无法解决的,希望本篇Elementor插件使用指南能够帮助到你。 Wordpress Elementor页面编…

【JVM】Java虚拟机

本文主要介绍了JVM的内存区域划分,类加载机制以及垃圾回收机制. 其实JVM的初心,就是让java程序员不需要去了解JVM的细节,它把很多工作内部封装好了.但是学习JVM的内部原理有利于我们深入理解学习Java. 1.JVM的内存区域划分 JVM其实是一个java进程 ; 每个java进程,就是一个jvm…

茶百道:门店数量狂飙,食品安全问题成最大绊脚石

茶百道近日传出即将在香港进行非交易路演&#xff0c;计划在今年内登陆港交所上市&#xff0c;消息一出引发市场广泛关注。然而&#xff0c;茶百道的上市能否成为其自救的解药&#xff0c;还存在诸多质疑。 茶百道的惊人营收增长背后&#xff0c;门店数量的迅速扩张功不可没。在…

Windows10安装麒麟桌面V10双系统

概述 想要在Windows10操作系统中安装麒麟V10的桌面操作系统&#xff08;Kylin-Desktop-V10-Professional-Release-Build1-210203-X86_64&#xff09; 安装前准备 1、先搞清楚自己的电脑类型 A MBR传统bios单硬盘 B MBR 传统bios双硬盘&#xff08;SSD固态硬盘机械硬盘&…

PatchMatchNet笔记

PatchMatchNet笔记 1 概述2 PatchmatchNet网络结构图2.1 多尺度特征提取2.2 基于学习的补丁匹配 3 性能评价 PatchmatchNet: Learned Multi-View Patchmatch Stereo&#xff1a;基于学习的多视角补丁匹配立体算法 1 概述 特点   高速&#xff0c;低内存&#xff0c;可以处理…

SQLMAP --TAMPER的编写

跟着师傅的文章进行学习 sqlmap之tamper脚本编写_sqlmap tamper编写-CSDN博客 这里学习一下tamper的编写 这里的tamper 其实就是多个绕过waf的插件 通过编写tamper 我们可以学会 在不同过滤下 执行sql注入 我们首先了解一下 tamper的结构 这里我们首先看一个最简单的例子…

python基于DETR(DEtection TRansformer)开发构建人员手持物品检测识别分析系统

PyTorch训练代码和DETR&#xff08;DEDetection-TRansformer&#xff09;的预训练模型。我们用Transformer替换了完全复杂的手工制作的对象检测管道&#xff0c;并将Faster R-CNN与ResNet-50匹配&#xff0c;使用一半的计算能力&#xff08;FLOP&#xff09;和相同数量的参数在…

React升级到18版本

前言 升级前react版本是16.9.0&#xff0c;react-dom版本为16.9.0&#xff0c;react-router-dom为5.1.2版本。 安装 // npm npm install react react-dom// yarn yarn add react react-dom// pnpm pnpm install react react-dom启动项目 此时&#xff0c;项目可以正常运行&…

【python】——控制语句和组合数据类型(其二)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

【开源】基于JAVA的服装店库存管理系统

项目编号&#xff1a; S 052 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S052&#xff0c;文末获取源码。} 项目编号&#xff1a;S052&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 角色管理模块2.3 服…

OpenCV技术应用(3)— 把.png图像保存为.jpg图像

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。本节课就手把手教你如何把.png图像保存为.jpg图像&#xff0c;希望大家学习之后能够有所收获~&#xff01;&#x1f308; 目录 &#x1f680;1.技术介绍 &#x1f680;2.实现代码 &#x1f680;1.技术介绍 如果在电脑某…

image is being used by stopped container 7d2ff8620f3b 删除镜像失败怎么办

这个错误信息表明&#xff0c;镜像 55860ee0cd73 正被一个已停止的容器 7d2ff8620f3b 使用&#xff0c;因此无法正常删除。要解决这个问题&#xff0c;你有两个选择&#xff1a; 删除使用该镜像的容器&#xff1a;首先删除引用该镜像的容器&#xff0c;然后再删除镜像。这可以通…

到站上海!见证这座零碳园区的绿色低碳新选择

不知不觉中&#xff0c;科士达新能源的零碳足迹已遍布五洲四海&#xff0c;为全球各地&#xff0c;千行百业、千家万户&#xff0c;带去了源源不断的绿色能源和低碳新选择。再次启航&#xff0c;这一站&#xff0c;抵达上海世博园。 小机身&#xff0c;大配置&#xff0c;灵活适…