构建高性能内存队列:Disruptor 永远滴神~

news2024/12/29 1:23:35

我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。

Disruptor简单使用

先简单介绍下:

  • Disruptor它是一个开源的并发框架,并获得2011 Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单~
  • 日志框架Log4j2 异步模式采用了Disruptor来处理
  • 局限呢,他就是个内存队列,也就是说无法支撑分布式场景。

简单使用

数据传输对象

@Data
public class EventData {
    private Long value;
}
复制代码

消费者

public class EventConsumer implements WorkHandler<EventData> {

    /**
     * 消费回调
     * @param eventData
     * @throws Exception
     */
    @Override
    public void onEvent(EventData eventData) throws Exception {
        Thread.sleep(5000);
        System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());
    }
}
复制代码

生产者

public class EventProducer {

    private final RingBuffer<EventData> ringBuffer;

    public EventProducer(RingBuffer<EventData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(Long v){
        // cas展位
        long next = ringBuffer.next();
        try {
            EventData eventData = ringBuffer.get(next);
            eventData.setValue(v);
        } finally {
            // 通知等待的消费者
            System.out.println("EventProducer send success, sequence:"+next);
            ringBuffer.publish(next);
        }
    }
}
复制代码

测试类

public class DisruptorTest {

    public static void main(String[] args) {
        // 2的n次方
        int bufferSize = 8;

        Disruptor<EventData> disruptor = new Disruptor<EventData>(
                () -> new EventData(), // 事件工厂
                bufferSize,            // 环形数组大小
                Executors.defaultThreadFactory(),       // 线程池工厂
                ProducerType.MULTI,    // 支持多事件发布者
                new BlockingWaitStrategy());    // 等待策略

        // 设置消费者
        disruptor.handleEventsWithWorkerPool(
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer());

        disruptor.start();

        RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer();
        EventProducer eventProducer = new EventProducer(ringBuffer);
        long i  = 0;
        for(;;){
            i++;
            eventProducer.sendData(i);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
复制代码

核心组件

基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢?

RingBuffer: 环形数组,底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作

Sequencer: 核心管家

  • 定义生产同步的实现:SingleProducerSequencer单生产、MultiProducerSequencer多生产

  • 当前写的进度Sequence cursor

  • 所有消费者进度的数组Sequence[] gatingSequences

  • MultiProducerSequencer可用区availableBuffer【利用空间换取查询效率】

Sequence: 本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger; 还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。

workProcessor: 处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler

EventHandler: 负责业务逻辑的handler,自己实现。

WaitStrategy: 消费者 如何等待 事件的策略,定义了如下策略

  • leepingWaitStrategy:自旋 + yield + sleep

  • BlockingWaitStrategy:加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的

  • YieldingWaitStrategy:自旋 + yield + 自旋

  • BusySpinWaitStrategy:自旋,减少线程之前切换

  • PhasedBackoffWaitStrategy:自旋 + yield + 自定义策略

关注公众号:码猿技术专栏 每天定时推送更多精彩内容

带着问题来解析代码?

1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】

每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。

接下来我们看下多生产类MultiProducerSequencer中next方法【获取生产序号】

// 消费者上一次消费的最小序号 // 后续第二点会讲到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 当前进度的序号
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消费者的序号 //后续第二点会讲到
protected volatile Sequence[] gatingSequences = new Sequence[0];

 public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do
        {
            // 当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值
            current = cursor.get();
            // 要申请的序号空间:最大序列号
            next = current + n;
  
            long wrapPoint = next - bufferSize;
            // 消费者最小序列号
            long cachedGatingSequence = gatingSequenceCache.get();
            // 大于一圈 || 最小消费序列号>当前进度
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                // 说明大于1圈,并没有多余空间可以申请
                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
                // 更新最小值到Sequence的value中
                gatingSequenceCache.set(gatingSequence);
            }
            // CAS成功后更新当前Sequence的value
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);
        return next;
    }
复制代码

2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】

从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号-buffersize 要小于/等于 最小消费的序列号】的时候, 才能申请到当前写的序号

public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
    return createWorkerPool(new Sequence[0], workHandlers);
}


EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);


    consumerRepository.add(workerPool, sequenceBarrier);

    final Sequence[] workerSequences = workerPool.getWorkerSequences();

    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

    return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}

    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        // 消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequences
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}
复制代码

3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】

这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer使用了int 数组 【availableBuffer】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer中当前序列号置为1标识可以读取。

如此消费者可以读取的的最大序号就是我们availableBuffer中第一个不可用序号-1。

初始化availableBuffer流程

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    // 初始化可用数组
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
// 初始化默认availableBuffer为-1
private void initialiseAvailableBuffer()
{
    for (int i = availableBuffer.length - 1; i != 0; i--)
    {
        setAvailableBufferValue(i, -1);
    }

    setAvailableBufferValue(0, -1);
}

// 生产者成功生产事件将可用区数组置为1
public void publish(final long sequence)
{
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}

private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
复制代码

消费者消费流程

WorkProcessor类中消费run方法
public void run()
    {
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // 先通过cas获取消费事件的占有权
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 数据就绪,可以消费
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    // 触发回调函数
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    // 获取可以被读取的下标
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
        // ....省略
        }

        notifyShutdown();

        running.set(false);
    }
    
    
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
        // 这个值获取的current write 下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 通过availableBuffer筛选出第一个不可用序号 -1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        // 从current read下标开始, 循环至 current write,如果碰到availableBuffer 为-1 直接返回
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }

        return availableSequence;
    }
复制代码

解决伪共享问题

什么是伪共享问题呢?

为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。

关注公众号:码猿技术专栏 每天定时推送更多精彩内容

伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cache line中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cache line中,每次却要去内存中拉取。

Disruptor是如何解决的呢?

在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存

回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?

  • 缓存行填充,避免缓存频繁失效。【java8中也引入@sun.misc.Contended注解来避免伪共享】
  • 无锁竞争:通过CAS 【二阶段提交】
  • 环形数组:数据都是覆盖,避免GC
  • 底层更多的使用位运算来提升效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/77084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单机存储系统可靠性及相关技术介绍

一、存储系统可靠性的影响因素单机存储系统包括存储硬件和存储软件。存储硬件又包含存储介质、存储控制器、设备固件&#xff1b;存储软件栈层次则更为复杂&#xff0c;以Linux为例包括&#xff1a;存储设备驱动层、 块设备层(Block Layer)、可选的虚拟块设备层(Device Mapper)…

新手使用wvp-pro和zlm的菜鸟说明(手把手教)

对于wvp-pro的使用&#xff0c;很多大佬都是白嫖菜鸟党&#xff0c;很多都第一次使用wvp&#xff0c;甚至第一次接触国标&#xff0c;连国标最基本流程都不清楚。所以写此文档以供各位菜鸟大佬点评指正 看此文档前提&#xff1a; 第一&#xff1a;先看三遍zlm和wvp的wiki&…

【光照感知子场:差分感知融合模块与中间融合策略相结合】

PIAFusion: A progressive infrared and visible image fusion network based on illumination aware 本文提出了一种基于光照感知的渐进式图像融合网络PIAFusion&#xff0c;自适应地保持显著目标的亮度分布和背景的纹理信息。具体而言&#xff0c;我们设计了一个光照感知子网…

【Java基础篇】基础知识易错集锦(一)

在学习的路上&#xff0c;我们只记得学习新的知识&#xff0c;却忽略了一切新知识都是在旧知识的基础上&#xff1b;努力奔跑的过程中&#xff0c;也要记得常回头看看&#xff1b; 题目展示&#xff1a; 解析&#xff1a; abstract是抽象的意思&#xff0c;在java中&#xff0…

【Vue 快速入门】使用vue脚手架创建一个项目

文章目录一、环境检查1.安装node环境2.脚手架配置3.不同版本vue介绍二、创建项目三、脚手架配置解说1.配置解说2.我的第一个vue程序一、环境检查 1.安装node环境 Node.js发布于2009年5月&#xff0c;由Ryan Dahl开发&#xff0c;是一个基于Chrome V8引擎的JavaScript运行环境…

Flutter - AlignmentGeometry :Alignment 和 FractionalOffset

AlignmentGeometry 是一个抽象类&#xff0c;它有两个常用的子类&#xff1a;Alignment和 FractionalOffset Alignment Alignment继承自AlignmentGeometry&#xff0c;表示矩形内的一个点&#xff0c;他有两个属性x、y&#xff0c;分别表示在水平和垂直方向的偏移 上图中 Flu…

阳光保险港交所上市:年营收1200亿 市值超600亿港元

雷递网 雷建平 12月9日阳光保险集团股份有限公司 (简称&#xff1a;“阳光保险”&#xff0c;06963)今日在港交所上市&#xff0c;发行价为每股5.83港元&#xff0c;募资净额为64.195亿港元。若行使超额配股权&#xff0c;阳光保险可额外再募资9.81亿港元。阳光保险发行价为5.8…

软件测试 -- 进阶 7 软件测试环境构建 与 测试数据准备

工欲善其事&#xff0c;必先利其器。-- 《论语卫灵公》 释译&#xff1a;工匠想要工作做好&#xff0c;一定要先让工具锋利。比喻要做好一件事&#xff0c;准备工作非常重要。 1. 为什么要构建测试环境、准备测试数据 提前准备测试所需资源保证测试有效执行保证测试用序执…

ChatGPT新玩法来了,微信聊天机器人

前言 上一篇文章中说了ChatGPT是什么&#xff0c;然后怎么注册使用。 传送门&#xff1a;花了1块钱体验一把最近很火的ChatGPT 但是实际操作下来还是有不少小伙伴跟我一样遇到各种坑。 没有科学上网工具OpenAI的服务在你的国家无法使用&#xff08;最多的问题&#xff09; 注…

自动驾驶之夜间检测调研

1. ExDark 第一个 公开 特定的提供 natural low-light images for object的数据集 7363张 low-light images, 12 classes Low-light image enhancement: IVC database. general image enhancement而非特指low-light. 黑夜是人工合成的,可以找到原图像See-in-the-Dark datase…

如何创建Spring项目以及如何使用?

目录&#xff1a; 1.创建Spring项目 2.将对象存储在Spring中 3.从Spring中取出对象 4.使用对象 5.总结 Spring 就是⼀个包含了众多⼯具⽅法的 IoC 容器&#xff0c;它具备两个最基本的功能&#xff1a; 将对象存储到容器&#xff08;Spring&#xff09;中&#xff1b;从容器…

Web大学生网页作业成品——美食餐饮网站设计与实现(HTML+CSS+JavaScript)

&#x1f468;‍&#x1f393;静态网站的编写主要是用HTML DIVCSS JS等来完成页面的排版设计&#x1f469;‍&#x1f393;,常用的网页设计软件有Dreamweaver、EditPlus、HBuilderX、VScode 、Webstorm、Animate等等&#xff0c;用的最多的还是DW&#xff0c;当然不同软件写出的…

java SpringMVC 之 表现层与前端数据传输 SSM整合 异步处理前后台处理联调 拦截器

SSM整合 项目结构配置搭建 pom的依赖&#xff1a; <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><de…

如何使用 Python 实现彩票自由(双色球)

福彩双色球也是购买次数最多的彩种之一&#xff0c;相比大乐透&#xff0c;双色球更容易中小奖 下面将介绍 Python 实习双色球彩票自由的流程 1. 随机一注 福彩双色球一注同样包含 7 个数字&#xff0c;包含 6 个红球和 1 个篮球 其中 红球是从 1 - 33 中选择 6 个不同的数…

【Keras+计算机视觉+Tensorflow】生成对抗神经网络中DCGAN、CycleGAN网络的讲解(图文解释 超详细)

觉得有帮助麻烦点赞关注收藏~~~ 一、生成对抗网络简介 生成对抗网络(GANs&#xff0c;Generative Adversarial Nets),由Ian Goodfellow在2014年提出的,是当今计算机科学中最有趣的概念之一。GAN最早提出是为了弥补真实数据的不足&#xff0c;生成高质量的人工数据。GAN的主要思…

数据宝藏“淘金热”,腾讯云大数据愿做“卖铲人”

‍‍数据智能产业创新服务媒体——聚焦数智 改变商业大数据产业作为数字化时代的基础设施之一&#xff0c;正在成为新时代经济发展的重要动能之一。11月30日&#xff0c;在2022腾讯全球数字生态大会大数据专场上&#xff0c;腾讯云大数据重磅发布了两款具有高频应用场景的产品…

晶品特装科创板上市:市值68亿 主打地面无人装备研发与产销

雷递网 雷建平 12月9日北京晶品特装科技股份有限公司&#xff08;简称&#xff1a;“晶品特装”&#xff0c;证券代码&#xff1a;688084&#xff09;昨日在科创板上市。晶品特装本次发行1900万股&#xff0c;发行价为60.98元&#xff0c;募资总额11.59亿元。晶品特装昨日收盘价…

R语言从经济时间序列中用HP滤波器,小波滤波和经验模态分解等提取周期性成分分析

经济时间序列的分析通常需要提取其周期性成分。最近我们被客户要求撰写关于经济时间序列的研究报告&#xff0c;包括一些图形和统计输出。这篇文章介绍了一些方法&#xff0c;可用于将时间序列分解为它们的不同部分。它基于《宏观经济学手册》中Stock和Watson&#xff08;1999&…

TypeScript和JavaScript的区别,全面解读超详细(二)

了解基础篇&#xff1a;请跳转 3.4.3 TS的编译 点击跳转 TypeScript和JavaScript的区别,全面解读超详细 我们知道.js的文件可以直接在浏览器中运行的&#xff0c;而.ts或者.tsx却不行&#xff0c;所以我们在运行TS项目时需要编译成浏览器引擎可以识别的JS语言。同时为了提高编…

体验最近火爆的ChatGPT

体验最近火爆的ChatGPT演示前言体验1. 回答问题2. 写方案、写作3. 写代码4. 各种古怪刁钻问题回答国内用户如何注册ChatGPT账号并在线体验如果不能在官网体验&#xff0c;可以在我的网站上体验演示 体验最近火爆的ChatGPT 前言 前几天OpenAI公布了ChatGPT算是火爆朋友圈&…