单机无锁线程安全队列-Disruptor

news2024/11/28 8:28:10

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>4.0.0</version>
        </dependency>

定义一个商品

@Data
public class Goods {

    private String name;

}

定义生产者

public class Producer {
    private final RingBuffer<Goods> ringBuffer;

    public Producer(RingBuffer<Goods> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 生产货品
     * @param goodsName
     */
    public void onData(String goodsName) {
        long sequence = ringBuffer.next();
        try {
            Goods goods = ringBuffer.get(sequence);
            goods.setName(goodsName);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{

    private String name;

    public Consumer(String name){
        this.name = name;
    }

    @Override
    public void onEvent(Goods goods, long l, boolean b)  {
        //消费者接收到货品
        System.out.println(name+"消费了"+goods.getName());
    }

    @Override
    public void onBatchStart(long batchSize, long queueDepth) {
        EventHandler.super.onBatchStart(batchSize, queueDepth);
    }

    @Override
    public void onStart() {
        EventHandler.super.onStart();
    }

    @Override
    public void onShutdown() {
        EventHandler.super.onShutdown();
    }

    @Override
    public void onTimeout(long sequence) throws Exception {
        EventHandler.super.onTimeout(sequence);
    }

    @Override
    public void setSequenceCallback(Sequence sequenceCallback) {
        EventHandler.super.setSequenceCallback(sequenceCallback);
    }
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {
    
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.SINGLE,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //单生产者,单消费者
        disruptor.handleEventsWith(new Consumer("Consumer1"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //多个消费者按顺序消费
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费
   disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3"))
   .then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {

    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer1 = new Producer(ringBuffer);
        Producer producer2 = new Producer(ringBuffer);
        Producer producer3 = new Producer(ringBuffer);
        while (true){
            producer1.onData("goods"+UUID.randomUUID());
            producer2.onData("goods"+UUID.randomUUID());
            producer3.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1286626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代替APP?微信小程序到底好在哪?

2019年是微信小程序宣布登场的一年&#xff0c;它实现了应用程序能被“垂手可得”的愿望。用户只需简单扫一扫或搜索&#xff0c;就能轻松打开应用。与需要在应用市场下载的APP相比&#xff0c;微信小程序可以在微信中被轻易地获取和传播&#xff0c;同时也带来了非凡的使用体验…

102.套接字-Socket网络编程4(TCP通信流程)

目录 TCP编程流程 套接字函数 1.创建套接字 2.绑定地址 3.监听连接请求 4.接受连接 5. 连接到服务器 6. 发送数据 7. 接收数据 8.关闭套接字 服务器端通信流程 示例代码 客户端通信流程 代码示例 TCP编程流程 TCP是一个面向连接的&#xff0c;安全的&#xff0c;流…

单调栈与单调队列算法总结

单调栈 知识概览 单调栈最常见的应用是找到每一个数离它最近的且比它小的数。单调栈考虑的方式和双指针类似&#xff0c;都是先想一下暴力做法是什么&#xff0c;然后再挖掘一些性质如单调性&#xff0c;最终可以把目光集中在比较少的状态中&#xff0c;从而达到降低时间复杂…

【Linux】基础IO--重定向理解Linux下一切皆文件缓冲区

文章目录 一、重定向1.什么是重定向2.dup2 系统调用3.理解输入重定向、输出重定向和追加重定向4.简易shell完整实现 二、理解linux下一切皆文件三、缓冲区1.为什么要有缓冲区2.缓冲区的刷新策略3.缓冲区的位置4.实现一个简易的C语言缓冲区5.内核缓冲区 一、重定向 1.什么是重定…

VMware虚拟机系统CentOS镜像的下载

文章目录 阿里云下载官网下载参考文档 一些小版本可能过时或者其他原因已经不能存在了&#xff0c;只有大版本号最新的&#xff0c;或者其他最新版本 阿里云下载 1-百度搜索&#xff1a;阿里云 2-找到开发者社区 3-找到下载&#xff0c;选择镜像 4-选择系统 5-点击镜像地…

【eNSP实践】eNSP实战篇(2)之简单实现交换机与主机的配置(图文详解)

目录 写在前面涉及知识1、交换机实验1.1 实验条件1.2 实验步骤A、打开eNSP软件&#xff0c;创建拓扑B、搭建主机与交换机连线C、配置交换机和主机D、验证不同网段设备可通性 1.3 通过交换机查看MAC地址 写在最后 写在前面 其实前面文章我有介绍关于路由器的使用&#xff0c;但…

OCP Java17 SE Developers 复习题08

答案 答案 答案 A. This code is correct. Line 8 creates a lambda expression that checks whether the age is less than 5, making option A correct. Since there is only one parameter and it does not specify a type, the parentheses around the parameter are …

TrustZone​之在安全状态之间切换

如果处理器处于NS.EL1,而软件想要转移到S.EL1,应该如何实现呢? 要改变安全状态,无论是向上还是向下,执行都必须经过EL3,如下图所示: 前面的图表显示了在不同安全状态之间移动涉及的步骤的示例序列。逐步进行解释: 进入较高的异常级别需要一个异常。通常,此异常…

网络程序设计

互相连接&#xff0c;发送信息 tcp和udp协议 tcp会有准备&#xff0c;udp不会准备。 8080端口&#xff1a;tomcat端口&#xff0c;java和web相连接 80端口&#xff1a;http 21端口&#xff1a;ftp 套接字 socket&#xff1a;提供给程序可以对外进行连接的接口 ip地址 特…

利用github copilot完成代码,利用正则化完成字符串中信息查找

利用正则化完成字符串中的字符拆解。 下面的代码是实现在“计算机组成原理-计科2101-123456-小明同学.docx”中提取出班级&#xff08;grade&#xff09;&#xff0c;学号&#xff08;id&#xff09;&#xff0c;姓名&#xff08;name&#xff09;。以下的代码都是github copi…

java中Random随机数使用和生成随机数的多个示例

在 Java 中&#xff0c;我们可以使用 java.util.Random 类生成伪随机数。伪随机数的特性是&#xff0c;虽然它们看起来是随机的&#xff0c;但实际上它们是由一个固定的算法生成的。只要我们提供相同的种子&#xff0c;这个算法就会生成相同的数字序列。 首先&#xff0c;我们…

Java链接数据库

本文介绍的是Java链接数据库中的JDBC操作&#xff0c;JDBC虽然现在用的不多&#xff0c;但面试的时候会问道。需要有相应的了解。下面以链接MySQL为例子。 JDBC 什么jdbc Java DataBase Connectivity是一种用于执行SQL语句的Java API&#xff0c;它由一组用Java语言编写的类和…

pod容器内无法访问集群外部主机ipv6地址

一、背景 同事反馈他这边有一环境出现pod容器内无法请求集群外部主机ipv6地址&#xff0c;但是在pod所在集群所主机上是可以请求到外部主机ipv6地址。 二、问题处理过程 首先主机和主机之间ipv6地址能通讯&#xff0c;说明主机之间网络是没啥问题&#xff0c;哪问题就出在容器…

Python语言基础学习大纲(由某大模型生成)

自从上次经丙察察游了一次滇藏线&#xff0c;已有3个没写一篇了。今天利用由某大模型生成的上面这张思维导图&#xff0c;配合这个大模型生成的6000多字拼凑出一篇博文聊以交差。 Python语言概述 一、语言特点 1.语法简单明了 Python的语法简洁易懂&#xff0c;使得编写代码…

邮件群发工具的功能:实用性与高效率功能推荐

市场营销对于每个企业来讲都至关重要&#xff0c;他能为企业带来商机的增长&#xff0c;获得持续的收益。邮件营销作为一种传统但是较少为众多行业使用的营销手段&#xff0c;同样也存在着无限的潜力。 它可以实现&#xff1a; 精准点对点个性化营销。数据报表追踪营销效果。…

如果不小心修改了按钮的名字并且忘记了原名字

出现上述情况&#xff0c;可以右边点击转到代码&#xff0c;注释掉问题行&#xff0c;此页的设计界面就恢复了。

Taro 学习教程 - - - - - 开发环境的安装 helloworld

一、Taro脚手架安装 npm install tarojs/cli -g // or yarn add tarojs/cli -g // or cnpm install tarojs/cli -g1.1 如何判断taro安装成功 taro -v正常安装成功之后显示如图&#xff1a; 1.2 环境变量配置(自行判断是否需要手动配置) 如果遇到如下问题&#xff0c;则是需要…

顶级资源!五个免费图标素材网站

图片太花哨了&#xff0c;纯文本太单调了&#xff1f;别忘了设计师的魔法武器——图标&#xff01;图标材料是UI设计师不可缺少的一部分。优秀的图标设计不仅可以提高界面美感&#xff0c;还可以提高用户的互动体验&#xff0c;帮助用户更好地了解应用程序的功能和信息。在本文…

2024年CSC国际区域问题研究及外语高层次人才培养项目介绍

国家留学基金委&#xff08;CSC&#xff09;公布了2024年国际区域问题研究及外语高层次人才培养项目&#xff0c;申报时间均为3月中下旬。为帮助关注者了解项目申报情况&#xff0c;知识人网小编特整理本文。 近日&#xff0c;国家留学基金委&#xff08;CSC&#xff09;公布了…

P4715 【深基16.例1】淘汰赛-仅思路

首先从题干要求入手&#xff0c;我们可以了解到题目要求是二进一&#xff0c;不难想到这是二叉树的题 再来&#xff0c;从题干可以知道&#xff0c;我们所采用的结构体除了需要有树的两个左右节点指针外&#xff0c;还需要两个变量用来储存“能力值”和“编号” 在这道题中&am…