【开源项目】Disruptor框架介绍及快速入门

news2024/11/18 15:29:08

Disruptor框架简介

Disruptor框架内部核心的数据结构是Ring Buffer,Ring Buffer是一个环形的数组,Disruptor框架以Ring Buffer为核心实现了异步事件处理的高性能架构;JDK的BlockingQueue相信大家都用过,其是一个阻塞队列,内部通过锁机制实现生产者和消费者之间线程的同步。跟BlockingQueue一样,Disruptor框架也是围绕Ring Buffer实现生产者和消费者之间数据的交换,只不过Disruptor框架性能更高,笔者曾经在同样的环境下拿Disruptor框架跟ArrayBlockingQueue做过性能测试,Disruptor框架处理数据的性能比ArrayBlockingQueue的快几倍。

Disruptor框架性能为什么会更好呢?其有以下特点:

  1. 预加载内存可以理解为使用了内存池;
  2. 无锁化
  3. 单线程写
  4. 消除伪共享
  5. 使用内存屏障
  6. 序号栅栏机制

相关概念

  • Disruptor:是使用Disruptor框架的核心类,持有RingBuffer、消费者线程池、消费者集合ConsumerRepository和消费者异常处理器ExceptionHandler等引用;

  • Ring Buffer: RingBuffer处于Disruptor框架的中心位置,其是一个环形数组,环形数组的对象采用预加载机制创建且能重用,是生产者和消费者之间交换数据的桥梁,其持有Sequencer的引用;

  • Sequencer: Sequencer是Disruptor框架的核心,实现了所有并发算法,用于生产者和消费者之间快速、正确地传递数据,其有两个实现类SingleProducerSequencer和MultiProducerSequencer。

  • Sequence:Sequence被用来标识Ring Buffer和消费者Event Processor的处理进度,每个消费者Event Processor和Ring Buffer本身都分别维护了一个Sequence,支持并发操作和顺序写,其也通过填充缓存行的方式来消除伪共享从而提高性能。

  • Sequence Barrier:Sequence Barrier即为序号屏障,通过追踪生产者的cursorSequence和每个消费者( EventProcessor)的sequence的方式来协调生产者和消费者之间的数据交换进度,其实现类ProcessingSequenceBarrier持有的WaitStrategy等待策略类是实现序号屏障的核心。

  • Wait Strategy:Wait Strategy是决定消费者如何等待生产者的策略方式,当消费者消费速度过快时,此时是不是要让消费者等待下,此时消费者等待是通过锁的方式实现还是无锁的方式实现呢?

  • Event Processor:Event Processor可以理解为消费者线程,该线程会一直从Ring Buffer获取数据来消费数据,其有两个核心实现类:BatchEventProcessor和WorkProcessor。

  • Event Handler:Event Handler可以理解为消费者实现业务逻辑的Handler,被BatchEventProcessor类引用,在BatchEventProcessor线程的死循环中不断从Ring Buffer获取数据供Event Handler消费。

  • Producer:生产者,一般用RingBuffer.publishEvent来生产数据。

快速入门

MQManager启用Disruptor,返回RingBuffer实例。

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理  
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //指定事件工厂  
        HelloEventFactory factory = new HelloEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率  
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能  
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者  
        disruptor.handleEventsWith(new HelloEventHandler());

        // 启动disruptor线程  
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件  
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
}

MessageModel消息实体类

@Data
public class MessageModel {  
    private String message;  
}

工厂类

public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override  
    public MessageModel newInstance() {  
        return new MessageModel();  
    }  
}  

消息处理器

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override  
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {  
        try {  
            log.info("消费者处理消息开始");  
            if (event != null) {  
                log.info("消费者消费的信息是:{}",event);  
            }  
        } catch (Exception e) {  
            log.info("消费者处理消息失败");  
        }  
        log.info("消费者处理消息结束");  
    }  
}  

消息发送

@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {  
  
    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;
  
    @Override  
    public void sayHelloMq(String message) {  
        log.info("record the message: {}",message);  
        //获取下一个Event槽的下标  
        long sequence = messageModelRingBuffer.next();  
        try {  
            //给Event填充数据  
            MessageModel event = messageModelRingBuffer.get(sequence);  
            event.setMessage(message);  
            log.info("往消息队列中添加消息:{}", event);  
        } catch (Exception e) {  
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());  
        } finally {  
            //发布Event,激活观察者去消费,将sequence传递给改消费者  
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer  
            messageModelRingBuffer.publish(sequence);  
        }  
    }  
} 

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/517344.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

视觉错觉图像可逆信息隐藏

—————————————————————————————————————————————————————————— 文献学习&#xff1a;视觉错觉图像可逆信息隐藏 [1] Jiao S , Jun F . Image steganography with visual illusion[J]. Optics Express, 2021, 29(10…

【算法与数据结构】栈

栈 栈&#xff1a;结构定义 放入元素是从底向上放入 有一个栈顶指针&#xff0c;永远处在栈顶的元素 还需要标记栈大小的size 栈的性质&#xff1a; Fisrt-in Last-out (FILO) 先进后出 栈改变元素的顺序 栈&#xff1a;出栈 让栈顶指针向下移动一位 栈&#xff1a;入栈 …

【JavaEE】SpringMVC

目录 SpringMVC 获取连接 RequestMapping / GetMapping... 获取参数 获取querystring中的参数(获取表单数据基本相同) 获取URL中的参数 获取JSON对象 获取文件(通过表单) 获取Cookie 获取Header 获取Session 返回数据 返回数据 返回JSON对象 返回静态页面 请求…

云渲染时可以关机吗_云渲染电脑可以关闭吗?

云渲染可简单理解为放在云端的渲染农场&#xff0c;可区别于用户本地自己搭建的小型私有农场&#xff0c;用户只需将自己制作好的项目文件进行打包&#xff0c;通过 云渲染平台提供的客户端或网页端将文件上传到云端进行渲染。很多用户通过云渲染作业&#xff0c;解放了自己本地…

深耕5G+AIoT产业赛道,2023高通&美格智能物联网技术开放日隆重举行

5月11日&#xff0c;高通技术公司携手美格智能联合举办了“高通&美格智能物联网技术开放日”深圳站活动。大会现场&#xff0c;智能物联网行业合作伙伴齐聚一堂&#xff0c;围绕5GAIoT前沿技术&#xff0c;通过大咖专业的技术分享、落地应用介绍和现场丰富的产品展示&#…

Pytorch nn.Softmax(dim=?) 详解

本文参考自&#xff1a;Pytorch nn.Softmax(dim?) - 知乎 原文写得很好了&#xff0c;我这边另外完善了一些细节&#xff0c;让大家理解地更加直白一些。 可以先去看上面的参考文章&#xff0c;也可以直接看我这篇。 目录 1、tensor1 1&#xff09;已知该矩阵的维度为&am…

vue实现聊天框自动滚动

需求 1、聊天数据实时更新渲染到页面 2、页面高度随聊天数据增加而增加 3、竖向滚动 4、当用户输入聊天内容或者接口返回聊天内容渲染在页面后&#xff0c;自动滚动到底部 5、提供点击事件操控滚动条上下翻动 环境依赖 vue&#xff1a;vue…

两小时搭建属于自己的chatGPT(ChatGLM)免硬件(白嫖)

目录 准备&#xff08;注册&#xff09;: 搭建: API模式: 测试&#xff1a; 总结&#xff1a; 准备&#xff08;注册&#xff09;: 注册modelscope(白嫖)免费使用服务器 https://modelscope.cn/ 按照图片里的选择(选择其他好像不能创建成功) 可以白嫖60多个小时的配置 8…

Java 8 Time 关于java.time包中你可能不知道的使用细节

目录 前言一、时区与时间1. 世界标准时&#xff1a;UTC、GMT、UT2. 地区时&#xff1a;Asia/Shanghai、UTC83. 时区&#xff1a;ZoneId、TimeZone4. 时间偏移量&#xff1a;ZoneOffset5. 时区简称&#xff1a;CTT、PRC 二、主要时间类1. 重要时间接口&#xff1a;Temporal2. 时…

【CocosCreator入门】CocosCreator组件 | Collider(碰撞)组件

Cocos Creator是一款流行的游戏开发引擎&#xff0c;具有丰富的组件和工具&#xff0c;其中碰撞系统组件是该引擎的重要组成部分。该组件可用于检测游戏中各个元素之间的碰撞&#xff0c;例如玩家角色与敌人、子弹与障碍物等。 目录 一、组件介绍 二、组件属性 2.1BoxCollid…

基于SpringBoot+微信小程序的农产品销售平台

基于SpringBoot微信小程序的农产品销售平台 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目…

Test Doubles测试替身: Testing in Distributed Systems and Real World

什么是Test Doubles In software testing, we developed unit tests and integration tests to test the codes functionality. However, in the real world, it is very common for a piece of code to interact with external components, for example, databases, public A…

【人工智能概论】pyplot作图中文显示、逐点坐标显示、保存图像

【人工智能概论】pyplot作图中文显示、逐点标记、保存图像 文章目录 【人工智能概论】pyplot作图中文显示、逐点标记、保存图像一. 简单的绘图二. 逐点坐标显示三. 中文显示四. 中文显示可能遇到的问题——缺少字体4.1 下载 SimHei.ttf4.2 复制 SimHei.ttf 到 Matplotlib 的 fo…

好的Robots.txt设计对Google收录有很大的帮助

Robots.txt 文件是用于指导搜索引擎爬虫在网站上爬行的标准。正确地设计 Robots.txt 文件可以帮助 Google 爬虫更好地理解您的网站结构&#xff0c;从而提高您的网站在 Google 搜索引擎上的收录率。 以下是一些设计 Robots.txt 文件的技巧&#xff0c;可以帮助 Google 爬虫更好…

security 报错:There is no PasswordEncoder mapped for the id “null“

security在登录的时候 无法登录成功 首先解读错误 下面百度翻译 安全框架设置了登录验证 说你没有密码编辑器 解决方法 一: 往容器中注册一个PasswordEncoder 解决方法二: 设置用户权限和角色的时候添加方法,加进去一个PasswordEncoder 只需要解决方案的话 下面的内容…

K8S系列之污点和容忍度详细分析

架构图 本篇文档主要介绍污点和容忍度的关系。 污点和容忍度 污点顾名思义就是脏的东西&#xff0c;给节点添加污点来限制pod调度到该节点上&#xff0c;如果pod可以容忍这种污点就可以被调度到有污点的节点上&#xff0c;如果不能容忍就不能被调度到该节点上。 污点作用于节…

排队领奖模式吸引新消费者,电商平台如何创新引流拓客?

在当前的电商市场中&#xff0c;由于竞争日趋激烈&#xff0c;很多电商平台产生了引流拓客缺乏新意的难题&#xff0c;即很难找到新的流量&#xff0c;并且难以把这些流量转化为消费者。在这个瞬息万变的时代&#xff0c;当然是谁有创意谁能吸引消费者&#xff0c;谁才能当道。…

Sequence-to-Sequence Knowledge Graph Completion and Question Answering

[2203.10321] Sequence-to-Sequence Knowledge Graph Completion and Question Answering (arxiv.org) 目录 1 Abstract 2 Introduction 3 KGT5 Model 3.1 Textual Representations & Verbalization 3.2 Training KGT5 for Link Prediction 3.3 Link Prediction Inf…

Inception Network

文章目录 一、Inception Network简介二、CNN的痛点三、Inception Network1. 1x1卷积核1.1 升维/降维&#xff1a;1.2. 调节参数数量&#xff1a;1.3. 增加非线性特性&#xff1a; 2. Inception原始模型3. Inception Module4. Inception Network 四、代码示例 一、Inception Net…

接口自动化测试 vs. UI自动化测试:为什么前者更快,更省力,更稳定?

从入门到精通&#xff01;企业级接口自动化测试实战&#xff0c;详细教学&#xff01;&#xff08;自学必备视频&#xff09; 目录 前言&#xff1a; 一、什么是接口自动化测试和 UI 自动化测试 二、为什么接口自动化测试效率比 UI 自动化测试高 1.执行速度 2.维护成本 3.…