异步编程 - 13 高性能线程间消息传递库 Disruptor

news2024/11/23 22:39:35

文章目录

  • Disruptor概述
    • Disruptor中的核心术语
    • Disruptor 流程图
  • Disruptor的特性详解
  • 基于Disruptor实现异步编程

在这里插入图片描述


Disruptor概述

Disruptor是一个高性能的线程间消息传递库,它源于LMAX对并发性、性能和非阻塞算法的研究,如今构成了其Exchange基础架构的核心部分。

要理解Disruptor是什么,最好的方法是将它与目前你已经很好地理解且与之非常相似的东西进行比较,例如与Java的BlockingQueue进行对比。

与队列一样,Disruptor的目的也是在同一进程内的线程之间传递数据(例如消息或事件);

而与传统JDK中的队列不同的是,Disruptor提供了以下关键功能:

  • Disruptor中的同一个消息会向所有消费者发送,即多播能力(Multicast Event)。

  • 为事件预先分配内存(Event Preallocation),避免运行时因频繁地进行垃圾回收与内存分配而增加开销。

  • 可选择无锁(Optionally Lock-free),使用两阶段协议,让多个线程可同时修改不同元素。

  • 缓存行填充,避免伪共享(prevent false sharing)。

Disruptor中的核心术语

在理解Disruptor如何工作前,先了解一下Disruptor中的核心术语,即Disruptor中的DDD(Domain-Driven Design)域对象:

  • Ring Buffer:环形缓冲区,通常被认为是Disruptor的核心,但是从3.0版本开始,Ring Buffer仅负责存储和更新Disruptor中的数据(事件)。

  • Sequence:Disruptor使用Sequence作为识别特定组件所在位置的方法。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的移动,因此Sequence支持AtomicLong的许多当前功能。事实上,3.0版本与2.0版本之间唯一真正的区别是防止了Sequence和其他变量之间出现伪共享。

  • Sequencer:Sequencer是Disruptor的真正核心。该接口的2个实现(单生产者和多生产者)实现了所有并发算法,用于在生产者和消费者之间快速、正确地传递数据。

  • Sequence Barrier:序列屏障,由Sequencer产生,包含对Sequencer中主要发布者的序列Sequence和任何依赖的消费者的序列Sequence的引用。它包含了确定是否有可供消费者处理的事件的逻辑。

  • Wait Strategy:等待策略,确定消费者如何等待生产者将事件放入Disruptor。

  • Event:从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。

  • EventProcessor:用于处理来自Disruptor的事件的主事件循环,并拥有消费者序列的所有权。其有一个名为BatchEventProcessor的实现,它包含事件循环的有效实现,并将回调使用者提供的EventHandler接口实现(在线程池内运行BatchEventProcessor的run方法)。

  • EventHandler:由用户实现并代表Disruptor的消费者的接口。

  • Producer:调用Disruptor以将事件放入队列的用户代码。这个概念在代码中也没有具体表示。


Disruptor 流程图

介绍完Disruptor中的核心概念,我们将这些元素组合在一起,下所示为LMAX在其高性能核心服务中使用Disruptor的示例。图中有3个消费者,即日志记录JournalConsumer(将输入数据写入持久性日志文件)、复制ReplicationConsumer(将输入数据发送到另一台机器以确保存在数据的远程副本)和业务逻辑ApplicationConsumer(真正的处理工作),其中JournalConsumer和ReplicationConsumer是可以并行执行的。

在这里插入图片描述

【Disruptor示例流程图】

Producer向Disruptor的Ring Buffer中写入事件,消费者JournalConsumer和Replication Consumer(EventHandler)使用多播方式同时消费Ring Buffer中的每一个元素,两者都有各自的SequenceBarrier用来控制当前可消费Ring Buffer中的哪一个事件,并且当不存在可用事件时如何处理。消费者ApplicationConsumer则是等JournalConsumer和ReplicationConsumer对同一个元素处理完毕后,再处理该元素,这可以使用下图来概括。

在这里插入图片描述

【Disruptor示例流程简化图】

每个消费者持有自己的当前消费序号,由于是环形buffer,因而生产者写入事件时要看序号最小的消费者序号,以避免覆盖还没有被消费的事件,另外Consumer3只能消费已经被Consumer1、Consumer2都处理过的事件。


Disruptor的特性详解

Disruptor具有多播能力(Multicast),这是Java中队列和Disruptor之间最大的行为差异。

当有多个消费者在同一个Disruptor上监听事件时,所有事件都会发布给所有消费者,而Java队列中的每个事件只会发送给某一个消费者。Disruptor的行为旨在用于需要对同一数据进行独立的多个并行操作的情况。

Disruptor的目标之一是在低延迟环境中使用。在低延迟系统中,必须减少或移除运行时内存分配;在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿。为了支持这一点,用户可以预先为Disruptor中的事件分配其所需的存储空间(也就是声明Ring Buffer的大小)。在构造Ring Buffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的。

低延迟期望推动的另一个关键实现细节是使用无锁算法来实现Disruptor,所有内存可见性和正确性保证都是使用内存屏障(体现为volatile关键字)或CAS操作实现的。在Disruptor的实现中只有一种情况需要实际锁定——当使用BlockingWaitStrategy策略时,这仅仅是为了使用条件变量,以便在等待新事件到达时parked消费线程。许多低延迟系统将使用忙等待(busy-wait)来避免使用条件可能引起的抖动,但是大量在系统繁忙等待的操作可能导致性能显著下降,尤其是在CPU资源严重受限的情况下。

在JDK的BlockingQueue中添加或取出元素时是需要加独占锁的,通过锁来保证多线程对底层共享的数据结构进行并发读写的线程安全性,使用锁会导致同时只有一个线程可以向队列添加或删除元素。Disruptor则使用两阶段协议,让多个线程可同时修改不同元素,需要注意,消费元素时只能读取到已经提交的元素。在Disruptor中某个线程要访问Ring Buffer中某个序列号下对应的元素时,要先通过CAS操作获取对应元素的所有权(第一阶段),然后通过序列号获取对应的元素对象并对其中的属性进行修改,最后再发布元素(第二阶段),只有发布后的元素才可以被消费者读取。当多个线程写入元素时,它们都会先执行CAS操作,获取到Ring buffer中的某一个元素的所有权,然后可以并发对自己的元素进行修改。注意,只有序列号小的元素发布后,后面的元素才可以发布。可知相比使用锁,使用CAS大大减少了开销,提高了并发度。

其实在单生产者的情况下Disruptor甚至可以省去CAS的开销,因为在这种情况下,只有一个线程来请求修改Ring Buffer中的数据,生产者的序列号使用普通的long型变量即可。在创建Disruptor时是可以指定是单生产者还是多生产者的,如果你的业务就是单生产者模型,那么创建Disruptor时指定生产者模式为ProducerType.SINGLE效果会更好。

计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加了一级或多级高速缓冲存储器(Cache),这个Cache一般是集成到CPU内部的,所以也叫CPU Cache,下图所示为两级Cache结构。

在这里插入图片描述

【Cache结构图】

Cache内部是按行存储的,其中每一行称为一个Cache行(见下图)。Cache行是Cache与主内存进行数据交换的单位,大小一般为2的幂次数字节。

CPU访问某一个变量时,首先会去看CPU Cache内是否有该变量,如果有则直接从中获取,否则就去主内存里获取该变量,然后把该变量所在内存区域的一个Cache行大小的内存复制到Cache。由于存放到Cache行的是内存块而不是单个变量,所以可能会把多个变量存放到了一个Cache行。当多个线程同时修改一个Cache行里的多个变量时,由于同时只能有一个线程操作缓存行,因而相比每个变量放到一个Cache行性能会有所下降,这就是伪共享。

在这里插入图片描述

【Cache行】

如下图所示,变量x,y同时被放到了CPU的一级和二级缓存,当线程1使用CPU1对变量x进行更新时,首先会修改CPU1的一级缓存变量x所在缓存行,这时候缓存一致性协议会导致CPU2中变量x对应的缓存行失效,那么线程2写入变量x的时候就只能去二级缓存查找,这就破坏了一级缓存,而一级缓存比二级缓存更快,这里也说明了多个线程不可能同时去修改自己所使用的CPU中缓存行中相同缓存行里面的变量。

更坏的情况下CPU只有一级缓存,会导致频繁地直接访问主内存 。

在这里插入图片描述

【伪共享展示图】

Disruptor中的Ring Buffer底层是一个地址连续的数组,数组内相邻的元素很容易会被放入同一个Cache行里,从而导致伪共享的出现。Disruptor则通过缓存行填充,让数组中的每个元素独占一个缓存行从而解决了伪共享问题的出现。

另外为了避免Ring Buffer中序列号(定位元素的游标)与其他元素共享缓存行,对其也进行了缓存行填充,以提高访问序列号时缓存的命中率。


基于Disruptor实现异步编程

摘录官方的一个例子并稍加改造,例子的功能其实就是对上节介绍的例子的实现,这个例子中将含有一个生产者来生成元素,然后有两个消费者JournalConsumer和ReplicationConsumer并行消费同一个元素,等同一个元素都被消费后,消费者ApplicationConsumer再执行具体业务逻辑。

首先引入依赖包:

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

其次定义Ring Buffer中存放的事件对象,其定义如下:

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long get() {
        return value;
    }
}

以上代码定义了事件类型LongEvent,其中包含业务参数value。为了让Disruptor框架预分配Ring Buffer中的事件对象,需要实现EventFactory接口提供一个构造事件对象的方法,代码如下:

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

再次创建具体的消费者用来消费生产的元素,这需要实现EventHandler接口,实现3个消费者:

//将输入数据写入持久性日志文件的消费者
public class JournalConsumer implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println(Thread.currentThread().getName() + "Persist Event: " + event.get());
    }
}

//将输入数据发送到另一台机器以确保存在数据的远程副本的消费者
public class ReplicationConsumer implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println(Thread.currentThread().getName() + "Replication Event: " + event.get());
    }
}

//真正处理业务逻辑的消费者
public class ApplicationConsumer implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println(Thread.currentThread().getName() + "Application Event: " + event.get());
    }
}

接着需要一个source来发布事件,source可以是来自于IO设备、网络、文件等的数据。下面使用原生API方式发布数据,发布者代码如下:

public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long bb) {
        long sequence = ringBuffer.next(); // 8.1 第一阶段,获取序列号
        try {
            LongEvent event = ringBuffer.get(sequence); // 8.2 获取序列号对应的实体元素
            event.set(bb); // 8.3 修改元素的值
        } finally {
            ringBuffer.publish(sequence);// 8.4 发布元素
        }
    }
}

显然,事件发布变得比使用JDK中简单队列更复杂,这是由于对事件预分配的需求。它需要实现消息发布的两阶段,即第一阶段获取Ring Buffer的槽中对象并修改,第二阶段发布可用数据;还必须将发布包装在try/finally块中。如果在Ring Buffer中声明一个槽(调用RingBuffer.next()),那么必须发布这个序列,否则可能会导致序列状态被污染。

最后一步是将上面所有组件连接在一起。可以手动连接所有组件,但可能有点复杂,因此提供DSL以简化构造,组装代码如下:

public class LongEventMain {

    public static void main(String[] args) throws Exception {
        // 1.创建Ring Buffer中事件元素的工厂对象
        LongEventFactory factory = new LongEventFactory();

        // 2.指定Ring Buffer的大小,必须为2的幂次方
        int bufferSize = 1024;

        // 3.构造Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE,new BlockingWaitStrategy());

        // 4.注册消费者
        disruptor.handleEventsWith(new JournalConsumer(), new ReplicationConsumer()).then(new ApplicationConsumer());
        // 5.启动Disruptor, 启动线程运行
        disruptor.start();

        // 6.从Disruptor中获取Ring Buffer
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        // 7. 创建生产者
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        // 8. 生产元素,并放入Ring Buffer
        for (long l = 0; l < 100; l++) {
            producer.onData(l);

            Thread.sleep(1000);
        }

        // 9.挂起当前线程
        Thread.currentThread().join();

    }
}

在上述代码中,代码1创建了一个事件对象生成的工厂对象;代码2指定Ring Buffer的大小;代码3构造Disruptor对象,其构造函数内会根据bufferSize的大小调用LongEventFactory创建对应个数的事件对象(事件预分配),并且这里使用DaemonThreadFactory.INSTANCE作为其背后异步任务调用的线程池(当然也可以传递自己的线程池)。另外,因为只有一个生产者,所以生产者模式设置为了ProducerType.SINGLE以便遵循Single Writer原则;最后设置Ring Buffer的等待策略为Blocking-WaitStrategy。

代码4注册消费者,注册了JournalConsumer、ReplicationConsumer和Application Consumer三个消费者,旨在等同一个元素被JournalConsumer和ReplicationConsumer消费后,ApplicationConsumer才可以消费对应的元素。

代码4执行完毕后框架还没启动,代码5的作用是启动框架内的线程;代码6从Disruptor中获取Ring Buffer,以便在后面向里面写入事件;代码7创建了一个生产者LongEventProducer实例并且把ringbuffer作为构造函数;代码8则循环创建100个数据,然后调用LongEventProducer的onData方法把事件发送出去,这个发送操作是异步的,会马上返回。

LongEventProducer的onData方法内代码8.1首先执行两阶段的第一阶段,也就是获取当前Ring Buffer中的序列号;代码8.2获取对应序列号对应的事件对象;代码8.3修改对象的属性;代码8.4则发布事件,发布后,其他消费者就对该元素可见了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/989674.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NIFI使用InvokeHTTP发送http请求

说明 这里介绍四种平时常用的http请求方法&#xff1a;GET、POST、PUT、DELETE。 在官方的介绍文档中关于InvokeHTTP处理器的描述是这么说的&#xff1a; An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Met…

java 企业工程管理系统软件源码 自主研发 工程行业适用

工程项目管理软件&#xff08;工程项目管理系统&#xff09;对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营&#xff0c;全过程、全方位的对项目进行综合管理 工程项目各模块及其功能点清单 一、系统管理 1、数据字典&am…

网工内推 | 云架构运维、网络工程师,base北京,最高20k

01 协合新能源 招聘岗位&#xff1a;IT运维工程师 职责描述&#xff1a; 1、对集团内使用云计算架构&#xff08;Kubernetes&#xff09;的系统进行规划、运维及管理相关工作。 2、对集团数据中心系统的大数据基础架构&#xff08;Cloudera Distribution Hadoop&#xff09;的…

【办公类-16-06】20230901大班运动场地分配表-斜线排列、5天循环、不跳节日,手动修改节日”(python 排班表系列)

背景需求&#xff1a; 大班组长发来一个“运动排班”的需求表&#xff1a;“就是和去年一样的每个班的运动排班&#xff0c;就因为今年大班变成7个班&#xff0c;要重新做一份&#xff0c;不然我就用去年的那份了&#xff08;8个大班排班&#xff09;” &#xff08;拆了中8班…

STM32定时器的One Pulse Mode,OPM应用

文章目录 OPM应用1-精准延时应用2-精准定时 OPM T IMx_CR1的OPM位 位 3 OPM&#xff1a;单脉冲模式 (One-pulse mode) 0&#xff1a;计数器在发生更新事件时不会停止计数 1&#xff1a;计数器在发生下一更新事件时停止计数&#xff08;将 CEN 位清零&#xff09; 应用1-精准延时…

光学显微镜算法(OMA)(含MATLAB代码)

先做一个声明&#xff1a;文章是由我的个人公众号中的推送直接复制粘贴而来&#xff0c;因此对智能优化算法感兴趣的朋友&#xff0c;可关注我的个人公众号&#xff1a;启发式算法讨论。我会不定期在公众号里分享不同的智能优化算法&#xff0c;经典的&#xff0c;或者是近几年…

期权开户必读:费用、保证金和稳定性安全性必须兼备

期权开户的核心是判断50ETF方向&#xff0c;上涨下跌都能赚钱&#xff0c;其次选择0门槛期权平台要考量期权手续费和安全性是第一位&#xff0c;下文为大家科普期权开户的核心&#xff1a;费用、保证金和稳定性安全性必须兼备的知识点。本文来自 &#xff1a;期权酱 一、期权开…

如何把Android Framework学彻底?一条龙学习

Framework通俗易懂 平时学习 Android 开发的第一步就是去学习各种各样的 API&#xff0c;如 Activity&#xff0c;Service&#xff0c;Notification 等。其实这些都是 Framework 提供给我们的。Framework 层为开发应用程序提供了非常多的API&#xff0c;我们通过调用这些 API …

自然语言处理——数据清洗

一、什么是数据清洗 数据清洗是指发现并纠正数据文件中可识别的错误的最后一道程序&#xff0c;包括检查数据一致性&#xff0c;处理无效值和缺失值等。与问卷审核不同&#xff0c;录入后的数据清理一般是由计算机而不是人工完成。 ——百度百科 二、为什么要数据清洗 现实生…

Apipost:你API管理中的得力助手

API管理的难点在哪&#xff1f; 相信无论是前端&#xff0c;还是后端的测试和开发人员&#xff0c;都遇到过这样的困难。不同工具之间数据一致性非常困难、低效。多个系统之间数据不一致&#xff0c;导致协作低效、频繁出问题&#xff0c;开发测试人员痛苦不堪。 开发人员在 …

STM32F4X RTC

STM32F4X RTC 什么是RTCSTM32F4X RTCSTM32F4X RTC框图STM32F4X RTC计数频率STM32F4X RTC日历STM32F4X RTC闹钟 STM32F4X RTC例程 什么是RTC RTC全程叫Real-Time Clock实时时钟&#xff0c;是MCU中一个用来计时的模块。RTC的一个主要作用是用来显示实时时间&#xff0c;就像日常…

Visual Studio 2019下使用C++与Python进行混合编程——环境配置与C++调用Python API接口

前言 在vs2019下使用C与Python进行混合编程,在根源上讲&#xff0c;Python 本身就是一个C库&#xff0c;那么这里使用其中最简单的一种方法是把Python的C API来嵌入C项目中&#xff0c;来实现混合编程。当前的环境是&#xff0c;win10,IDE是vs2019,python版本是3.9&#xff0c…

一个帮各位填秋招表格省一点事的浏览器插件

最近应该很多和我一样的双非鼠鼠在秋招等面试&#xff0c;而且处于海投阶段&#xff0c;为了不忘记投了哪些公司&#xff0c;可以用这样一个表格来记录&#xff1a; 其中有些字段&#xff0c;比如状态、投递时间、查看进度的网址其实可以不手动输入&#xff0c;所以搞个插件来…

2023数模国赛C 题 蔬菜类商品的自动定价与补货决策-完整版创新多思路详解(含代码)

题目简评&#xff1a;看下来C题是三道题目里简单一些的&#xff0c;考察的点比较综合&#xff0c;偏数据分析。涉及预测模型和运筹优化(线性规划)&#xff0c;还设了一问开放型问题&#xff0c;适合新手入门&#xff0c;发挥空间大。 题目分析与思路&#xff1a; 背景&#x…

部署zookeeper集群

zookeeper和jdk下载地址 jdk 链接&#xff1a;https://pan.baidu.com/s/13GpNaAiHM5HSDJ66ebBtEg 提取码&#xff1a;90se zookeeper 链接&#xff1a;https://pan.baidu.com/s/1nSFKEhSGNiwgSPZWdb7hkw 提取码&#xff1a;u5l2 在所有的机器上面执行下面步骤&#xff1a; 1.上…

C++的纯虚函数和抽象类

在C++中,可以将虚函数声明为纯虚函数,语法格式为: virtual 返回值类型 函数名 (函数参数) = 0; 纯虚函数没有函数体,只有函数声明,在虚函数声明的结尾加上=0,表明此函数为纯虚函数。 最后的=0并不表示函数返回值为0,它只起形式上的作用,告诉编译系统“这是纯虚函数”。…

继承的偏移量问题

下面是实际测试&#xff1a; p1 p3 ! p2 Base1* p1 &d; Derive* p3 &d;! Base2* p2 &d; 图解&#xff1a;

斯坦福小镇升级版——AI-Town搭建指南

导语&#xff1a; 8月份斯坦福AI小镇开源之后&#xff0c;引起了 AIGC 领域的强烈反响&#xff0c;但8月份还有另一个同样非常有意义的 AI-Agent 的项目开源&#xff0c;a16z主导的 AI-Town 本篇文章主要讲解如何搭建该项目&#xff0c;如有英文基础或者对这套技术栈熟悉&#…

监控系统prometheus部署

wget -c https://github.com/prometheus/prometheus/releases/downloa d/v2.37.1/prometheus-2.37.1.linux-amd64.tar.gz下载必要的组件。 mkdir -p /opt/prometheus创建目录。 此文章为9月Day 8学习笔记&#xff0c;内容来源于极客时间《运维监控系统实战笔记》。

docker安装开发常用软件MySQL,Redis,rabbitMQ

Docker安装 docker官网&#xff1a;Docker: Accelerated Container Application Development docker镜像仓库&#xff1a;https://hub.docker.com/search?qnginx 官网的安装教程&#xff1a;Install Docker Engine on CentOS | Docker Docs 安装步骤 1、卸载以前安装的doc…