初步了解高性能队列——Disruptor(Java)

news2025/1/16 12:51:20

高性能队列——Disruptor

① 概述

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。

传统队列存在的问题

队列有界性结构队列类型
ArrayBlockingQueue有界加锁数组阻塞
LinkedBlockingQueue可选加锁链表阻塞
ConcurrentLinkedQueue无界无锁链表非阻塞
LinkedTransferQueue无界无锁链表阻塞
PriorityBlockingQueue无界加锁阻塞
DelayQueue无界加锁阻塞

队列的底层数据结构一般分成三种:数组、链表和堆。其中,堆这里是为了实现带有优先级特性的队列,暂且不考虑。
在稳定性和性能要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。但是ArrayBlockingQueue是通过加锁的方式保证线程安全,而且ArrayBlockingQueue还存在伪共享问题,这两个问题严重影响了性能。


缓存行

Cache是由很多个cache line组成的。每个cache line通常是64字节,并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节,因此在一个缓

存行中可以存8个long类型的变量。

CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line。

在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此你能非常快的遍历这个数组。

伪共享

ArrayBlockingQueue有三个成员变量: - takeIndex:需要被取走的元素下标 - putIndex:可被元素插入的位置的下标 - count:队列中元素的数量

这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。

在这里插入图片描述

如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。

这种无法充分使用缓存行特性的现象,称为伪共享。

对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。

② 高性能的原因

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。数组元素不会被回收,避免频繁的GC。

  • 元素位置定位

    数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。采用CAS无锁方式,保证线程的安全性

环形数组

  • 因为是数组,所以要比链表快,而且根据我们对上面缓存行的解释知道,数组中的一个元素加载,相邻的数组元素也是会被预加载的,因此在这样的结构中,cpu无需时不时去主存加载数组中的下一个元素。而且,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。环形数组中的元素采用覆盖方式,避免了jvm的GC。
  • 其次结构作为环形,数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,这个跟一致性哈希中的环形策略有点像。在disruptor中,这个环形结构就是RingBuffer,既然是数组,那么就有大小,而且这个大小必须是2的n次方。
  • 实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环

生产和消费模式

在Disruptor中生产者分为单生产者和多生产者,而消费者并没有区分。单生产者情况下,就是普通的生产者向RingBuffer中放置数据,消费者获取最大可消费

位置,并进行消费。而多生产者时候,又多出了一个跟RingBuffer同样大小的Buffer,称为AvailableBuffer。在多生产者中,每个生产者首先通过CAS竞争获取可

以写的空间,然后再进行慢慢往里放数据,如果正好这个时候消费者要消费数据,那么每个消费者都需要获取最大可消费的下标,这个下标是在AvailableBuffer进

行获取得到的最长连续的序列下标。

单生产者

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

在这里插入图片描述

多生产者

Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

在这里插入图片描述

牛逼的下标指针

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding
{...}

RingBuffer的指针(Sequence)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一,而且通过缓存行补充,避免伪共享问题。 该所谓指针是通过一直自增的方式来获取下一个可写或者可读数据,该数据是Long类型,不用担心会爆掉。

③ demo

  • 定义一个简单的消息体

    /**
     * 消息体
     */
    @Data
    public class MessageModel {
        private String message;
    }
    
  • 创建构造消息的工厂需要继承EventFactory 用于初始化RingBuffer

    /**
     * 构造EventFactory 用于初始化RingBuffer
     */
    public class HelloEventFactory implements EventFactory<MessageModel> {
        // 初始化RingBuffer
        @Override
        public MessageModel newInstance() {
            return new MessageModel();
        }
    }
    
  • 创建消费者

    /**
     * 构造EventHandler-消费者
     */
    @Slf4j
    public class HelloEventHandler implements EventHandler<MessageModel> {
        @Override
        public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
            try {
                //这里停止1000ms是为了确定消费消息是异步的
                Thread.sleep(1000);
                log.info("消费者处理消息开始");
                if (event != null) {
                    log.info("消费者消费的信息是:{}",event);
                }
            } catch (Exception e) {
                log.info("消费者处理消息失败");
            }
            log.info("消费者处理消息结束");
        }
    }
    
  • 配置Disruptor

    /**
     * 配置Disruptor
     */
    @Configuration
    public class MQManager {
    
    
        @Bean("messageModel")
        public RingBuffer<MessageModel> messageModelRingBuffer() {
            // 定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorService提供的线程来触发consumer的事件处理
            ExecutorService executor = Executors.newFixedThreadPool(2);
    
            // 指定事件工厂
            HelloEventFactory factory = new HelloEventFactory();
    
            // 指定RingBuffer字节大小,必须为2的N次方(通过位运算,加快定位的速度),否则将影响效率
            // 当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据
            int bufferSize = 4;
    
            // 单生产者模式,获取额外的性能
            // 创建disruptor方式一:使用自定义的线程池创建(已弃用)
            Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                    ProducerType.SINGLE, new BlockingWaitStrategy());
    
            // 创建disruptor方式二:传入线程工厂
            // 生产者的线程工厂
    //        ThreadFactory threadFactory = Executors.defaultThreadFactory();
    //        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, threadFactory,
    //                ProducerType.SINGLE, new BlockingWaitStrategy());
            // 设置事件业务处理器---消费者
            disruptor.handleEventsWith(new HelloEventHandler());
    
            // 启动disruptor线程
            disruptor.start();
    
            // 获取ringbuffer环,用于接取生产者生产的事件
            RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
    
            return ringBuffer;
        }
    }
    
  • 创建生产者生产消息

    public interface DisruptorMqService {
    
        /**
         * 消息
         * @param message
         */
        void sayHelloMq(String message) throws InterruptedException;
    }
    
    
    
    
    @Slf4j
    @Service
    public class DisruptorMqServiceImpl implements DisruptorMqService {
    
        @Autowired
        private RingBuffer<MessageModel> messageModelRingBuffer;
    
        @Override
        public void sayHelloMq(String message) throws InterruptedException {
            for (int l = 0; true; l++) {
                log.info("record the message: {}", message);
                //获取下一个Event槽的下标
                long sequence = messageModelRingBuffer.next();
                log.info("sequence = {}", sequence);
                try {
                    //给Event填充数据
                    MessageModel event = messageModelRingBuffer.get(sequence);
                    event.setMessage(message + l);
                    log.info("往消息队列中添加消息:{}", event);
                } catch (Exception e) {
                    log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
                } finally {
                    //发布Event,激活观察者去消费,将sequence传递给改消费者
                    //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
                    messageModelRingBuffer.publish(sequence);
                }
                Thread.sleep(5000);
            }
    
        }
    }
    
  • 测试

    @Autowired
    private DisruptorMqService disruptorMqService;
    
    /**
         * 项目内部使用Disruptor做消息队列
         *
         * @throws Exception
         */
    @Test
    public void sayHelloMqTest() throws Exception {
        disruptorMqService.sayHelloMq("消息到了,Hello world!");
        log.info("消息队列已发送完毕");
        // 这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/171242.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DevOps利器之二(Git,Gitlab)

一、背景Git&#xff0c;Gitlab在DevOps中主要解决持续集成源码管控部分&#xff0c;本文主要从基本概念&#xff0c;实施部署两部分介绍。二、git概述https://git-scm.com/book/zh/v2 --推荐官方电子书 Git - 它是一个源代码分布式版本控制系统&#xff0c;可让开发人员在本地…

行业分享:锂电池4大生产难题,视觉检测即可有效解决

导语&#xff1a;机器视觉检测已在锂电池生产的各个环节中&#xff0c;为产品产量与质量提供可靠保障。维视智造作为锂电池视觉检测系统提供商&#xff0c;为企业提供专业、系统、稳定的锂电行业解决方案&#xff0c;可保证0漏检&#xff0c;确保安全生产&#xff0c;全面提升生…

Java总结(运算符)

1.算数运算符short s12;s1s12; &#xff08;编译不能运行)short s12;s1 2 ; (编译能运行&#xff0c;不改变变量本身的数据类型)2.逻辑运算符区分&和&&相同点&#xff1a;运算结果相同&#xff1b;当符号左边是true时&#xff0c;两者都会执行符号右边的运算不同点…

医疗数据安全实力派 | 美创科技品牌案例入选《2022年医疗行业网络安全报告》

近日&#xff0c;网络安全产业机构“数说安全”正式发布《2022年医疗行业网络安全报告》&#xff08;以下简称“报告”&#xff09;。报告对我国医疗行业信息化现状和政策、医疗行业市场发展、医疗行业需求侧及供给侧进行深度剖析。美创科技作为专业数据安全代表厂商入选医疗网…

你应该知道的机器学习模型部署细节和实施步骤

机器学习操作 (MLOps&#xff0c;Machine Learning Operations ) 是“机器学习”和“工程”的组合&#xff0c;涵盖了与生产 ML 生命周期管理有关的所有内容。 ML模型生命周期可大致分为三个阶段 文章目录技术交流设计模型开发操作步骤1&#xff1a;确定部署环境命令行终端Cond…

Arduino开发ESP8266网页服务器控制LED灯

根据板卡原理RGB三色LED对应引脚&#xff1a;int LEDR12、int LEDG14、int LEDB13;设置串口波特率为115200Serial.begin(115200);源代码如下所示&#xff1a;3.1添加头文件#include <ESP8266WiFi.h> // 提供 Wi-Fi 功能的库#include <ESP8266WebServer.h> // 提供网…

Solidity 中的数学(第 3 部分:百分比和比例)

本文是关于在 Solidity 中进行数学运算的系列文章中的第三篇 。 这次的主题是&#xff1a;百分比和比例。 介绍 金融数学从百分比开始。y的x百分比是多少&#xff1f;y占x的多少百分比&#xff1f;我们都知道答案&#xff1a;y的x百分比是x y 100&#xff0c;y是y 10…

GPIO 应用

应用层如何控制 GPIO&#xff0c; 譬如控制 GPIO 输出高电平、或输出低电平。应用层如何操控 GPIO与 LED 设备一样&#xff0c; GPIO 同样也是通过 sysfs 方式进行操控&#xff0c;进入到/sys/class/gpio 目录下。可以看到该目录下包含两个文件 export、 unexport 以及 5 个 gp…

面试 | 百度测试开发岗位面试题目回顾

一面题目 二面题目 面试经历详情 在招聘网站投递简历后&#xff0c;收到面试通知信息&#xff08;如下图&#xff0c;为保护个人隐私&#xff0c;面试岗位、地点、时间等隐去&#xff09;。虽然写的是高级测试开发工程师&#xff0c;但是面试官说他们部门的测试不一定都要写框架…

【年更分享】带你看看前端生态圈的技术趋势 state-of-css 2022 state-of-js 2022详细解读

各位前端开发者们大家好&#xff0c;我又来给大家解读最新一年的 state-of-css & state-of-js 技术调查了&#xff01; 往年的 state-of-css 和 state-of-js 的解读&#xff1a; state-of-js 2020 详细解读 state-of-js 2021 详细解读 state-of-css 2021 详细解读 一、写…

php 断点调试 PHPStorm Xdebug helper

安装与使用php的xdebug扩展 浏览器访问只包含<?php phpinfo();的php文件&#xff0c;查看php详情。页面搜索是否安装了xdebug扩展。 如未安装&#xff0c;则访问&#xff1a;xdebug安装向导&#xff0c;将phpinfo()的输出页面复制到此文本框中&#xff0c;点击页面下方的“…

Ubuntu18.04安装Anaconda

Ubuntu18.04安装Anaconda 文章目录Ubuntu18.04安装Anaconda1 下载Anaconda2 安装annaconda3 创建新环境conda环境配置指令pip环境&#xff0c;阿里源切换虚拟环境pytorch 安装不成功pycharm切换python版本4 国内conda源5 卸载anacondaReference1 下载Anaconda 下载Anaconda3-2…

Zerotier免费的虚拟局域网

Zerotier介绍 Zerotier是一款用于构建异地虚拟局域网的工具。 通过网页后台创建虚拟网络并进行管理。 通过电脑上的Zerotier客户端连接各个异地电脑到虚拟局域网&#xff0c;从而实现组网。 由后台分配虚拟ip&#xff0c;并且各个异地电脑可以通过虚拟IP对同一虚拟局域网下的其…

零基础学SQL(九、分组 GROUP BY)

目录 前置建表 ​编辑 一、分组的概念 二、分组案例 三、分组的过滤HAVING子句 前置建表 CREATE TABLE student (id int NOT NULL AUTO_INCREMENT COMMENT 主键,code varchar(255) NOT NULL COMMENT 学号,name varchar(255) DEFAULT NULL COMMENT 姓名,sex enum(男,女) DEF…

HTML知识梳理

文本格式化标签 标签语义加粗 <strong></strong>或者<b></b>更推荐使用<strong>标签加粗语义更强烈倾斜 <em></em>或者<i></i>更推荐使用<em>标签倾斜语义更强烈删除线 <del></del>或者<s><…

MySQL8源代码安装(CentOS8版本)

目标 在CentOS8上面源代码编译安装MySQL8. 下载源代码 打开MySQL下载页面&#xff1a; https://www.mysql.com/downloads/ 找到MySQL社区版本页面&#xff1a; 选择下载MySQL社区版服务器进行下载&#xff1a; 最后选择&#xff0c;MySQL源代码进行下载&#xff0c;如下图…

以element ui为例分析前端各种弹窗和对话框的使用场景与区别

文章目录摘要Dialog 对话框Drawer 抽屉Notice 通知MessageBox 弹框Popconfirm 气泡确认框Message 消息提示Notification 通知Dialog 对话框与Drawer 抽屉的区别MessageBox和Dialog的区别Message消息提示与Notification通知的区别摘要 本文研究分析element ui 中的各种弹窗和对…

【机器学习 吴恩达】2022课程笔记(持续更新)

一、机器学习 1.1 机器学习定义 计算机程序从经验E中学习&#xff0c;解决某一任务T&#xff0c;进行某一性能P&#xff0c;通过P测定在T上的表现因经验E而提高 eg&#xff1a;跳棋程序 E&#xff1a; 程序自身下的上万盘棋局 T&#xff1a; 下跳棋 P&#xff1a; 与新对手下…

Python离线下载whl文件,xxx.wh1 is not a supported wheel on this platform

0、问题 今天在安装 whl 文件的时候&#xff0c;由于电脑处于没有网络的情况&#xff0c;只能在有网络的电脑上下载好 whl 文件&#xff0c;导入之后进行离线安装 但是由于版本不匹配的问题。导致报如下的错误&#xff1a; ERROR&#xff1a;xxx.wh1 is not a supported whe…

创建成功的风格指南

作者&#xff1a;Sean Watson&#xff0c;ServiceNow 创建风格指南是一项艰巨的任务。风格指南包含语法标准、语音和语调指南、要使用和避免的词、复制模式以及产品品牌的基础知识。这些文档很容易变得非常乏味以至于难以使用&#xff0c;或者非常简单以至于无法满足需求。它们…