2-2-3-5-5、Disruptor详解

news2025/1/15 13:05:52

目录

  • 简介
  • juc包下的队列
    • 存在的问题
  • 设计方案
  • RingBuffer数据结构
    • 数据存取方案
    • 常用等待策略
  • 写数据流程
    • 单线程(一个生产者)
    • 多线程(多个生产者)
      • 多个消费者读数据
      • 多个生产者写数据
  • 核心概念
  • 使用
    • 构造器
    • 引入依赖
    • 单生产者单消费者模式
      • 创建Event(消息载体/事件)和EventFactory(事件工厂)
      • 创建消息(事件)生产者
      • 创建消费者
      • 测试
    • 单生产者多消费者模式
    • 多生产者多消费者模式
    • 消费者优先级模式

简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单, 2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。注意,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列
Github:https://github.com/LMAX-Exchange/disruptor
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型

juc包下的队列

队列描述
ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue基于链表实现的一个无界阻塞对列,指定容量即为有界阻塞队列
PriorityBlockingQueue支持按优先级排序的无界阻塞队列
DelayQueue基于PriorityBlockingQueue实现的支持延迟的无界阻塞队列
SynchronousQueue不存储元素的阻塞队列
LinkedTransferQueue基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque基于链表结构实现的一个双端无界阻塞队列

存在的问题

  1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列
  2. 加锁的方式通常会严重影响性能。 线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患
  3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)

设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

**为了避免垃圾回收,采用数组而非链表。**同时,数组对处理器的缓存机制更加友好(空间局部性原理)

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。 下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据

  • 利用缓存行填充解决了伪共享的问题
  • 实现了基于事件驱动的生产者消费者模型(观察者模式)

消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费

RingBuffer数据结构

使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:

在这里插入图片描述

数据存取方案

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉。一旦出现覆盖数据,并且这个数据是没有消费的数据,那么就会触发等待策略

常用等待策略

BlockingWaitStrategy策略:常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy策略:会在循环中不断等待数据。先进行自旋等待如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志
YieldingWaitStrategy策略:这个策略用于低延时的场合。 消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略
BusySpinWaitStrategy策略: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用

写数据流程

单线程(一个生产者)

  1. 申请写入m个元素
  2. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素
  3. 若是返回的正确,则生产者开始写入元素

在这里插入图片描述

多线程(多个生产者)

多个生产者的情况下,会遇到**“如何防止多个线程重复写同一个元素”**的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可
但是会遇到一个新问题:**如何防止读取的时候,读到还未写的元素。**Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历availableBuffer,来判断元素是否已经就绪

多个消费者读数据

生产者多线程写入的情况下读数据会复杂很多:

  1. 申请读取到序号n
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置
  3. 消费者读取元素

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素

在这里插入图片描述

多个生产者写数据

多个生产者写入的时候:

  1. 申请写入m个元素
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成

在这里插入图片描述

核心概念

  • RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口
  • Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用
  • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享
  • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
  • SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑
  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
  • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义
  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口
  • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

在这里插入图片描述

使用

构造器

public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
  • EventFactory:创建事件(任务)的工厂类
  • ringBufferSize:容器的长度
  • ThreadFactory :用于创建执行任务的线程
  • ProductType:生产者类型:单生产者、多生产者
  • WaitStrategy:等待策略

引入依赖

<!-- disruptor -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.4</version>
</dependency>

单生产者单消费者模式

创建Event(消息载体/事件)和EventFactory(事件工厂)

创建 OrderEvent 类,这个类将会被放入环形队列中作为消息内容。创建OrderEventFactory类,用于创建OrderEvent事件

@Data
public class OrderEvent {
    private long value;
    private String name;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

创建消息(事件)生产者

创建 OrderEventProducer 类,它将作为生产者使用

public class OrderEventProducer {
    //事件队列
    private final RingBuffer<OrderEvent> ringBuffer;
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(long value, String name) {
        // 获取事件队列 的下一个槽
        long sequence = ringBuffer.next();
        try {
            //获取消息(事件)
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 写入消息数据
            orderEvent.setValue(value);
            orderEvent.setName(name);
        } catch (Exception e) {
            // TODO  异常处理
            e.printStackTrace();
        } finally {
            System.out.println("生产者" + Thread.currentThread().getName()
                    + "发送数据value:" + value + ",name:" + name);
            //发布事件
            ringBuffer.publish(sequence);
        }
    }
}

创建消费者

创建 OrderEventHandler 类,并实现 EventHandler ,作为消费者一般单个消费者实现EventHandler即可,WorkHandler用于处理多个消费者消息重复消费的问题

public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        // TODO 消费逻辑
        System.out.println("消费者" + Thread.currentThread().getName() + "获取数据value:" + event.getValue() + ",name:" + event.getName());
    }
    @Override
    public void onEvent(OrderEvent event) {
        // TODO 消费逻辑
        System.out.println("消费者" + Thread.currentThread().getName() + "获取数据value:" + event.getValue() + ",name:" + event.getName());
    }
}

测试

public class DisruptorDemo {
    public static void main(String[] args) throws Exception {
        //创建disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE, //单生产者
                new YieldingWaitStrategy()  //等待策略
        );
        //设置消费者用于处理RingBuffer的事件
        disruptor.handleEventsWith(new OrderEventHandler());
        //启动disruptor
        disruptor.start();
        //创建ringbuffer容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        //创建生产者
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        // 发送消息
        for (int i = 0; i < 100; i++) {
            eventProducer.onData(i, "Fox" + i);
        }
        disruptor.shutdown();
    }
}

单生产者多消费者模式

如果消费者是多个,只需要在调用 handleEventsWith 方法时将多个消费者传递进去

disruptor.handleEventsWith(new OrderEventHandler());
//使用下面的代码替换上面的代码
disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());

上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下,只会被一个消费者消费,那么需要调用 handleEventsWithWorkerPool 方法

disruptor.handleEventsWith(new OrderEventHandler());
//使用下面的代码替换上面的代码
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

注意:消费者要实现WorkHandler接口用于实现被一个消费者消费

多生产者多消费者模式

在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态

public class DisruptorDemo2 {
    public static void main(String[] args) throws Exception {
        //创建disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                new OrderEventFactory(),
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.MULTI, //多生产者
                new YieldingWaitStrategy()  //等待策略
        );
        //设置消费者用于处理RingBuffer的事件
        //disruptor.handleEventsWith(new OrderEventHandler());
        //设置多消费者,消息会被重复消费
        //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
        //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
        //启动disruptor
        disruptor.start();
        //创建ringbuffer容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        new Thread(() -> {
            //创建生产者
            OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
            // 发送消息
            for (int i = 0; i < 100; i++) {
                eventProducer.onData(i, "Fox" + i);
            }
        }, "producer1").start();
        new Thread(() -> {
            //创建生产者
            OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
            // 发送消息
            for (int i = 0; i < 100; i++) {
                eventProducer.onData(i, "monkey" + i);
            }
        }, "producer2").start();
        disruptor.shutdown();
    }
}

消费者优先级模式

在实际场景中,我们通常会因为业务逻辑而形成一条消费链。比如一个消息必须由 消费者A ->消费者B -> 消费者C 的顺序依次进行消费。在配置消费者时,可以通过 .then 方法去实现顺序消费

disruptor.handleEventsWith(new OrderEventHandler())
.then(new OrderEventHandler())
.then(new OrderEventHandler());

handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then 的,它们可以结合使用。比如可以按照 消费者A -> (消费者B 消费者C) -> 消费者D 的消费顺序

disruptor.handleEventsWith(new OrderEventHandler())
.thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
.then(new OrderEventHandler());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/82118.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信公众号开发——接收用户消息(图文、语言、上报位置、关注、取消关注)及自动回复

&#x1f60a; 作者&#xff1a; 一恍过去&#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390&#x1f38a; 社区&#xff1a; Java技术栈交流&#x1f389; 主题&#xff1a; 微信公众号开发——接收用户消息(图文、语言、上报位置、关注、取消关注)及…

推动MRO工业品数字化基建升级,数商云采购系统赋能企业采购数字化管理

MRO工业品是工业生产中的重要组成部分&#xff0c;经历了十余年的发展成长&#xff0c;市场规模持续增长&#xff0c;然而据数据显示&#xff0c;MRO工业品市场的线上渗透率仍停留在个位数&#xff0c;这意味着MRO工业品数字化采购仍有巨大的发展空间。 MRO工业品行业发展受困…

非零基础自学Golang 第1章 走进Go 1.1 Go编程语言概述 1.1.1 Go 的历史

非零基础自学Golang 文章目录非零基础自学Golang第1章 走进Go1.1 Go编程语言概述1.1.1 Go 的历史第1章 走进Go 1.1 Go编程语言概述 Go语言也叫Golang&#xff0c;是由谷歌&#xff08;Google&#xff09;公司在2007年推出的一款静态编译型语言。Go语言高效、简洁、容易上手&a…

上海诺基亚贝尔-S-010W-AV2B卡刷固件包

上海诺基亚贝尔-S-010W-AV2B卡刷固件包 固件特点&#xff1a; 1、修改dns&#xff0c;三网通用&#xff1b; 2、开放原厂固件屏蔽的市场安装和u盘安装apk&#xff1b; 3、无开机广告&#xff0c;无系统更新&#xff0c;不在被强制升级&#xff1b; 4、大量精简内置的没用的…

最新版Crack:MailBee.NET 2022最后版

MailBee.NET Items Package 包括 SMTP、POP3、IMAP、EWS、Security、Antispam、Outlook Converter、Address Validator、PDF 部件&#xff0c;以及作为免费功能的 BounceMail、HTML、MIME、ICalVCard 部件。MailBee.NET Objects是一组功能强大且功能丰富的 .NET 元素&#xff0…

基于nodejs如何爬取csdn上自己的文章

当你想把自己在csdn上写的文章转为hexo上可以发布的文章或是将文章写入自己的数据库时,可以用到 将所有博客数据写入数据库 获取你的文章的分页接口: 在浏览自己的所有文章时,我们不难发现,文章的数据是往下滑动到底时,才会刷新出新的数据, 那么此时肯定是发送了一个请求来获…

一篇文章教你实战Docker容器数据卷

在上一篇中&#xff0c;咱们对Docker中的容器数据卷做了介绍。已经知道了容器数据卷是什么&#xff1f;能干什么用。那么本篇咱们就来实战容器数据卷&#xff0c;Docker容器数据卷案例主要做以下三个案例 1&#xff1a;宿主机(也就是Docker所安装的机器)与容器之间的映射-让Do…

LeetCode 538. 把二叉搜索树转换为累加树(C++)

标签&#xff1a;二叉树搜索 深度优先遍历 二叉树 思路一&#xff1a;递归实现反向中序遍历&#xff0c;并累加递归过程中的根的值 思路二&#xff1a;使用迭代&#xff0c;给每个根节点添加一个反向中序遍历的前驱节点。 原题链接&#xff1a;https://leetcode.cn/problems/co…

数据分析业务场景 | CTR预估

一.概况 定义 是推荐中最核心的算法之一 对每次广告的点击情况做出预测&#xff0c;预测用户是点击还是不点击 就是预测点击与否的而分类算法&#xff0c;成功的关键之一就是样本的准确性 对于正样本&#xff0c;一般可发挥的空间不是很大&#xff0c;最多就是卡一个停留时…

LinkedList源码解析

LinkedList源码解析 简介 LinkedList 是一个双向链表&#xff08;内部是 Node 节点&#xff09;实现的 List&#xff0c;并且还实现了 Deque 接口&#xff0c;它除了作为 List 使用&#xff0c;还可以作为队列或者栈来使用。 这样看来&#xff0c;LinkedList 简直就是个全能…

【文字版】津津有味:感染新冠后没食欲,那咱也得吃饭啊!

点击文末“阅读原文”即可收听本期节目剪辑、音频 / 朱峰 编辑 / 姝琦 SandLiu监制 / 姝琦 文案 / 粒粒 产品统筹 / bobo你阳了吗&#xff1f;你嗓子疼吗&#xff1f;你的食欲还好吗&#xff1f;没有什么比好好吃饭更重要&#xff0c;生病的时候尤其是。营养对健康而言是预防…

浏览器上的Cookies有什么用?超级浏览器防关联如何实现?

Cookies是浏览器的指纹信息之一&#xff0c;它是一种文本文件&#xff0c;是网站为了辨别用户身份&#xff0c;对用户进行记录并由户客户端计算机进行暂时或永久保存的信息。一般情况下&#xff0c;网站对浏览器的指纹的记录主要是依靠Cookies来实现的。因为超级浏览器来可以生…

[附源码]JAVA毕业设计英语网站(系统+LW)

[附源码]JAVA毕业设计英语网站&#xff08;系统LW&#xff09; 项目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xf…

[附源码]Python计算机毕业设计大学生健康系统Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

c#和Python交互,完美解决Python调用OpenCV等第三方库以及分发时需配置python环境的问题

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、问题分析二、解决方案第一个问题第二个问题三、结果及源码四、总结前言 关于C#如何调用Python&#xff0c;网上提供了很多解决方案&#xff0c;有用ironPyt…

react组件深度解读

五、React 核心是组件 在 React 中&#xff0c;我们使用组件&#xff08;有状态、可组合、可重用&#xff09;来描述 UI 。 在任何编程语言中&#xff0c;你都可以将组件视为简单的函数。 React 组件也一样&#xff0c; 它的输入是 props&#xff0c;输出是关于 UI 的描述。我…

Win11 WSL Linux子系统安装与注销 配置conda环境 启动jupyter

1 前言 本篇博客讲解如何在Windows11系统中安装与注销Linux子系统&#xff0c;并配置conda环境、jupyter环境&#xff0c;实现在Local浏览器启动jupyter并运行项目。 2 安装Linux子系统&#xff08;参考文章[1]&#xff09; 1.1 WSL 在任务栏中的搜索功能中&#xff0c;搜索…

合并多文件后分组再结构化

【问题】 Heres the problem statement: In a folder in HDFS, therere a few csv files with each row being a record with the schema (ID, attribute1, attribute2, attribute3). Some of the columns (except ID) could be null or empty strings, and no 2 records wi…

汇编语言常用DOS功能调用示例

1.利用DOS功能调用输出响铃&#xff08;响铃的ASCII码为07H&#xff09;。建立源程序文件HELLO.ASM&#xff0c;通过汇编程序得到目标文件RING.OBJ以及列表文件RING.LST&#xff0c;通过连接程序得到可执行文件性文件 RING.EXE。对可执行性文件进行调试。 &#xff08;1&…

【数据结构】——栈和队列

目录 1.栈 1.1栈的概念及结构 1.2栈的实现 1.2.1具体实现 Stack.h 栈初始化 栈销毁 入栈 出栈 取栈顶数据 判空 栈中有效元素的个数 全部Stack.c的代码 测试Test.c代码 2.队列 2.1队列的概念及结构 2.2队列的实现 Queue.h 队列初始化 队列销毁 队尾入队列…