一文快速掌握高性能内存队列Disruptor

news2025/1/22 18:06:23

写在文章开头

Disruptor是英国外汇公司LMAX开源的一款高性能内存消息队列,理想情况下单线程可支撑600w的订单。所以本文会从使用以及设计的角度来探讨一下这款神级java消息队列。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

基础使用示例

前置步骤

我们会基于该框架实现一个简单的生产者消费者模型,在此之前我们需要引入一下依赖:

	<dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.3.4</version>
            </dependency>

约定消息模型

生产者消费者沟通的媒介就是消息,所以我们首先需要创建发送消息的消息模型,这里仅仅简单创建一个对象用户记录发送的字符串消息:

@Data
public class MessageModel {
    /**
     * 消息内容
     */
    private String message;
}

基于消息模型初始化消息队列空间

Disruptor对于事件的存储进行更新操作都是基于RingBuffer,项目启动前它会基于我们的消息也就是上文的MessageModel 进行空间预初始化,所以我们需要继承EventFactory编写创建实例方法,告知Disruptor如何创建什么样消息对象空间,这一点我们可以在源码RingBufferFields的构造方法中得以印证:

RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
    	//基于sequencer完成队列数组entries 数组初始化
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

      	//......

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //基于我们给定的MsgEventFactory完成数组内部元素空间预初始化
        fill(eventFactory);
    }

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
        	//调用我们的工厂方法完成元素内部元素空间初始化
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

所以我们直接继承EventFactory给出消息模型创建的工厂方法:

public class MsgEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

定制消息处理器

通过继承EventHandler并指定泛型即可接手并处理MessageModel消息,逻辑比较简单,读者可自行参阅:

@Slf4j
public class MsgEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {

        //休眠2s,模拟异步消费
        ThreadUtil.sleep(2000);

        log.info("消费者处理消息开始");
        if (messageModel != null) {
            log.info("收费者收到消息,序列号:{},消息内容:{}", sequence, messageModel.getMessage());
        }
    }
}

配置Disruptor核心参数

上述步骤完成之后,我们就可以配置环形队列RingBuffer了:

@Configuration
public class RingBufferConfig {

    @Bean
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义事件处理线程池,即消费者线程池
        ThreadFactory threadFactory = ThreadFactoryBuilder.create()
                .setNamePrefix("thread-")
                .build();

        //指定事件工厂
        MsgEventFactory msgEventFactory = new MsgEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(msgEventFactory, bufferSize, threadFactory,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new MsgEventHandler());

        // 启动disruptor线程
        disruptor.start();

        //获取RingBuffer,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
}

实现生产者

上述步骤已经完成的Disruptor的所有创建和配置工作,注入环形队列,我们的服务就可以投递的消息了,这里我们给出对应的DisruptorMqServiceImpl 的实现代码:

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;


    @Override
    public void sendMsg(String message) {
        log.info("投递消息,消息内容: {}", message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加事件:{}", JSONUtil.toJsonStr(event));
        } catch (Exception e) {
            log.error("消息发送失败,失败原因 {}", e.getMessage(), e);
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

效果展示

我们通过外部接口直接调用,可以看到MsgEventHandler接收并准确的处理了我们投递的消息:

2024-02-16 00:17:14.223  INFO 9924 --- [io-18080-exec-1] c.s.service.impl.DisruptorMqServiceImpl  : 投递消息,消息内容: demoData
2024-02-16 00:17:14.254  INFO 9924 --- [io-18080-exec-1] c.s.service.impl.DisruptorMqServiceImpl  : 往消息队列中添加事件:{"message":"demoData"}
2024-02-16 00:17:14.255  INFO 9924 --- [io-18080-exec-1] c.sharkChili.controller.TestController   : 消息队列已发送完毕
2024-02-16 00:17:16.268  INFO 9924 --- [       thread-0] com.sharkChili.handler.MsgEventHandler   : 消费者处理消息开始
2024-02-16 00:17:16.268  INFO 9924 --- [       thread-0] com.sharkChili.handler.MsgEventHandler   : 收费者收到消息,序列号:0,消息内容:demoData

Disruptor工作流程详解

Disruptor官网文档详尽给出所有核心的组件概念,详情可参考:LMAX Disruptor User Guide

这里我们以流程化的方式给出几个比较核心的概念,如下图所示,首先是生产者Producer也就是我们上文中的DisruptorMqServiceImpl通过RingBuffer获取对应序列号的消息对象MessageModel的引用将消息设置进去。此时基于等待策略等待就绪事件的对应的SequenceBarrier就会拿到这个消息的序列号并传递给消费者即EventHandlerEventHandler会基于当前收到的序列号到RingBuffer中获取对应的消息并处理。

在这里插入图片描述

这一点我们可以在BatchEventProcessor的run方法中得以印证:

@Override
    public void run()
    {
        //......
        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                	//sequenceBarrier基于等待策略获取就绪的消息序列号
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
					//拿到序列号之后从ringbuffer(也就是下文的dataProvider)获取对应的消息事件,并通过我们重写的eventHandler处理掉
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                   //......
                }
              //......
            }
        }
        finally
        {
           //......
        }
    }

详解Disruptor高效的原因

缓存填充

JDK自带的队列ArrayBlockingQueue通过上锁并阻塞线程的方式却保证生产者和消费者之间安全通信,我们的入队(在我们的场景可直接理解为消息投递)为例,可以看到put方法会先上锁如果得不到锁线程会直接进入WAIT状态,然后判断队列是否达到上限,同样的若达到上限当前线程也会被阻塞,等待队列不满时被唤醒再次进行添加操作:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //上锁,若得不到锁则进入等待状态
        lock.lockInterruptibly();
        try {
        	//队列已满则等待未满再进行操作
            while (count == items.length)
                notFull.await();
             //入队   
            enqueue(e);
        } finally {
        	//释放锁
            lock.unlock();
        }
    }

除此之外JDK自带的ArrayBlockingQueue也没有考虑到并发场景下的伪共享问题,例如每个线程对应的CPU核心都将内存中的ArrayBlockingQueue加载到缓存行中,为了保证双方的缓存一致性,一旦一端修改了ArrayBlockingQueue,那么另一端的ArrayBlockingQueue就会被视为脏数据,这就意味着另一端的CPU需要操作ArrayBlockingQueue就需要重新从内存加载一份全新的ArrayBlockingQueue才能进行更新操作,在并发激烈的场景下,这种情况操作效率大大降低:

在这里插入图片描述

DisruptorRingBuffer中的字段RingBufferFields涉及RingBuffer中核心变量信息的记录,为了避免伪共享问题RingBufferFields继承RingBufferPad保证RingBufferFields每次被加载时前方都有7个8字节的数据填充。同理RingBuffer继承RingBufferFields在其后方填充7个8字节数据,由此保证了每一个RingBufferFields的任意字段被加载时,都有7个不可变的字段填充再任意CPU左右,避免RingBufferFields某个字段更新后,其他CPU缓存行的数据变为脏数据的缓存一致性问题:

在这里插入图片描述

对应的我们给出缓存填充的代码示例:

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
//......
}


public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;
    //....
}

分支预测

Disruptor通过数组构成一个循环队列,它在初始化时就固定了存储空间,按照局部性原理,一次即可加载批量的元素到缓存行中,结合CPU的分支预测机制,因为数组的顺序加载规律分支预测器可以非常高效的预测并缓存下一条指令从而快速获取到数组中的下一个元素,这就是Disruptor第2个高效的原因:

Disruptor

无锁操作

进行消息批量投递和消费时,Disruptor都会按照如下步骤:

  1. 计算要获取的序列范围。
  2. CAS设置获取并更新序列号进度。
  3. CAS原子更新成功则获取并进行生产或者消费,反之循环重试CAS直至成功。

在这里插入图片描述

为了印证这一点,我们将生产模式模式改为多线程生产ProducerType.MULTI

 Disruptor<MessageModel> disruptor = new Disruptor<>(msgEventFactory, bufferSize, threadFactory,
                ProducerType.MULTI, new BlockingWaitStrategy());

我们以上文中DisruptorMqServiceImpl投递消息前获取序列号这一步作为入口查看这一过程:

 long sequence = messageModelRingBuffer.next();

可以看到底层用sequencer进行序列号自增:

@Override
    public long next()
    {
        return sequencer.next();
    }

对应MultiProducerSequencer的next方法可以看到,它会基于我们传入的n进行CAS累加操作,若成功则说明这批序列号获取成功,我们的生产者可以操作序列号对应的数组空间,从而进行消息投递,而这就是Disruptor高效的第3个原因——无锁。

@Override
    public long next(int n)
    {
      //......

        long current;
        long next;

        do
        {
        	//获取当前序列号
            current = cursor.get();
            //获取自增范围
            next = current + n;

           //......

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
              //......
            }
            //基于CAS原子更新cursor的数值,若成功则说明可以操作从current 到next这个范围的序列号的对应的数组元素空间,后续可以基于这个范围进行消息投递操作
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        //循环CAS操作直至成功
        while (true);

        return next;
    }

小结

以上便是笔者对于Disruptor剖析的全部内容,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

SpringBoot + Disruptor 实现特快高并发处理,支撑每秒 600 万订单无压力:https://mp.weixin.qq.com/s/k-WiWvIQcNft_fX7uroE0A

官网文档:https://lmax-exchange.github.io/disruptor/user-guide/index.html#user-guide-models

高性能队列——Disruptor:https://tech.meituan.com/2016/11/18/disruptor.html

【计组】理解Disruptor–《计算机组成原理》(十五):https://blog.csdn.net/weixin_56814032/article/details/128999761

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1648156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

上市企业扣非净利润是什么意思,可以反映什么问题?

扣非净利润&#xff0c;全称“扣除非经常性损益后的净利润”&#xff0c;是指企业在剔除与正常经营无关的、偶然发生的损益后所得到的利润。这些非经常性损益包括但不限于政府补贴、处置长期资产、税收返还等。 扣非净利润的计算公式为&#xff1a;扣非净利润 净利润 - 非经常…

2-手工sql注入(进阶篇) sqlilabs靶场5-10题

1. 阅读&#xff0c;学习本章前&#xff0c;可以先去看看基础篇&#xff1a;1-手工sql注入(基础篇)-CSDN博客 2. 本章通过对sqlilabs靶场的实战&#xff0c;关于sqlilabs靶场的搭建&#xff1a;Linux搭建靶场-CSDN博客 3. 本章会使用到sqlmap&#xff0c;关于sqlmap的命令&…

TC3xx MTU概述(1)

目录 1.MTU基本功能 2.MBIST 3.小结 1.MTU基本功能 在TC3xx中&#xff0c;MTU(Memory Unit Test)被用来管理控制芯片内部各种RAM的测试、初始化和数据完整性检查。 既然MTU主要是管理和控制&#xff0c;那干活的想必另有他人。所以在该平台中&#xff0c;我们可以看到SRAM…

公众号营业执照已注销,被冻结怎么换成新主体?

公众号迁移有什么作用&#xff1f;只能变更主体吗&#xff1f;长期以来&#xff0c;由于部分公众号在注册时&#xff0c;主体不准确的历史原因&#xff0c;或者公众号主体发生合并、分立或业务调整等现实状况&#xff0c;在公众号登记主体不能对应实际运营人的情况下&#xff0…

第八篇:深入探索操作系统架构:从基础到前沿

深入探索操作系统架构&#xff1a;从基础到前沿 1 引言 在当今这个高速发展的数字时代&#xff0c;操作系统无疑是计算机科学领域的基石之一。它不仅是计算机硬件与最终用户之间的桥梁&#xff0c;更是实现高效计算和资源管理的关键。操作系统的架构&#xff0c;即其内部结构和…

企业节能降耗系统,助力企业节能降耗

随着社会的发展和能源消耗的增加&#xff0c;节能降耗已经成为企业可持续发展的重要课题。为了更有效地监测和管理能源消耗&#xff0c;越来越多的企业开始使用能耗在线监测系统。作为一种节能降耗的有力手段&#xff0c;能耗在线监测系统在企业中得到广泛应用。 能耗在线监测…

AI智能分析视频监控行业的发展趋势和市场发展浅析

监控视频AI智能分析技术的现状呈现出蓬勃发展的态势&#xff0c;这一技术源于计算机视觉和人工智能的研究&#xff0c;旨在将图像与事件描述之间建立映射关系&#xff0c;使计算机能够从视频图像中分辨出目标信息。 在技术上&#xff0c;监控视频AI智能分析技术已经实现了对视…

XAMPP是什么?XAMPP好不好用?

XAMPP是一个免费且开源的软件套件&#xff0c;用于在个人计算机上轻松搭建和运行 Apache 服务器、MySQL 数据库、PHP 和 Perl&#xff0c;让用户可以在个人电脑上搭建服务器环境的平台。 XAMPP的由来是 X(表示跨平台)、Apache、MySQL、PHP 和 Perl 的首字母缩写。 它集成了这…

【隧道篇 / WAN优化】(7.4) ❀ 02. WAN优化的作用 ❀ FortiGate 防火墙

【简介】看了上一篇文章&#xff0c;相信大家都知道了在防火墙上启动WAN优化的方法&#xff0c;但是WAN优化到底能做什么&#xff1f;相信有很多人想了解。 什么是WAN优化 现在有许多企业通过集中应用程序或在云中提供应用程序来降低成本并整合资源。应用程序在本地局域网内都能…

汇凯金业:通货膨胀对能源行业有何影响

通货膨胀对能源行业有几方面的影响&#xff0c;具体取决于通货膨胀的原因、规模以及持续时间。以下是一些可能的效应&#xff1a; 成本增加&#xff1a;通货膨胀导致能源行业的运营成本上升。这包括原材料、设备、维护和人力成本。如果企业不能完全将成本转嫁给消费者&#xf…

初学python记录:力扣1329. 将矩阵按对角线排序

题目&#xff1a; 矩阵对角线 是一条从矩阵最上面行或者最左侧列中的某个元素开始的对角线&#xff0c;沿右下方向一直到矩阵末尾的元素。例如&#xff0c;矩阵 mat 有 6 行 3 列&#xff0c;从 mat[2][0] 开始的 矩阵对角线 将会经过 mat[2][0]、mat[3][1] 和 mat[4][2] 。 …

不必追求深度,浅尝辄止为宜

近日笔者撰文称&#xff0c;有幸应《百度-百家号》相邀&#xff0c;在其发起的《征文任务》栏目中写作深度文章&#xff0c;便试着开头写了一篇《万科有“活下去”的可能性吗&#xff1f;》的时评文章&#xff0c;于5月3日发表&#xff0c;舆情反映不错&#xff0c;不到三天时间…

万里牛ERP集成金蝶K3(万里牛主管供应链)

源系统成集云目标系统 金蝶K3介绍 金蝶K3是一款ERP软件&#xff0c;它集成了供应链管理、财务管理、人力资源管理、客户关系管理、办公自动化、商业分析、移动商务、集成接口及行业插件等业务管理组件。以成本管理为目标&#xff0c;计划与流程控制为主线&#xff0…

Jupyter配置

修改Jupyter打开界面 &#xff08;1&#xff09;打开【Anaconda Prompt】&#xff0c;输入【jupyter notebook --generate-config】命令 从运行结果可知【jupyter_notebook_config.py】的位置 &#xff08;2&#xff09;使用【记事本】打开 找到# c.NotebookApp.notebook_…

智能且用户友好的命令行:实时自动建议、精美标签补全 | 开源日报 No.246

fish-shell/fish-shell Stars: 24.4k License: NOASSERTION fish-shell 是一个用户友好的命令行 shell。 它是一个智能且用户友好的命令行 shell&#xff0c;适用于 macOS、Linux 以及其他相关系统。其特性包括语法高亮、实时自动建议和精美的标签补全&#xff0c;无需额外配置…

[CR]厚云填补_M3R-CR Dataset and Align-CR

Multimodal and Multiresolution Data Fusion for High-Resolution Cloud Removal: A Novel Baseline and Benchmark Abstract 去云(Cloud Removal)是遥感领域的一个重要且具有挑战性的问题&#xff0c;近年来在这一领域取得了显著进展。两个主要问题仍然阻碍着CR的发展&#…

Java设计模式 _结构型模式_代理模式(静态,动态)

一、基础概念 1、代理模式 代理模式&#xff08;Proxy Pattern&#xff09;是一种结构型设计模式。它允许我们通过添加一个代理对象来控制对另一个对象的访问&#xff0c;从而实现一些额外的功能&#xff0c;如访问控制、日志记录、性能监控等。代理模式主要分为静态代理和动态…

本地的git仓库和远程仓库

文章目录 1. 远程创建仓库2. 关联远程和本地代码3. 推送本地分支到远程 1. 远程创建仓库 2. 关联远程和本地代码 上面创建完后会得到一个git仓库的链接&#xff0c;有SSH或者http的 http://gitlab.xxxxx.local:18080/xxxxx/dvr_avm.git ssh://gitgitlab.xxxxx.local:10022/xx…

移动硬盘无法被识别怎么办?恢复移动硬盘3个正确做法

移动硬盘已成为我们日常生活和工作中不可或缺的数据存储设备。然而当移动硬盘突然无法被电脑识别时&#xff0c;往往会让人倍感焦虑。面对这种情况我们不必过于慌张&#xff0c;下面一起来看看指南解决。 解决方法一&#xff1a;检查硬件连接与供电 检查接口连接&#xff1a…

C++面向对象三大特性之---多态

一、多态的概念 多态的概念&#xff1a;通俗来说&#xff0c;就是多种形态&#xff0c;具体点就是去完成某个行为&#xff0c;当不同的对象去完成时会 产生出不同的状态。比如同样是买票的操作&#xff0c;学生买票就会打折&#xff0c;而普通的成人买票就是全款。 二、多态的…