构建高性能内存队列:Disruptor yyds~

news2025/1/23 10:24:53

Java中有哪些队列

  • ArrayBlockingQueue 使用ReentrantLock
  • LinkedBlockingQueue 使用ReentrantLock
  • ConcurrentLinkedQueue 使用CAS
  • 等等

我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。

Disruptor简单使用

github地址:Performance Results · LMAX-Exchange/disruptor Wiki · GitHub

先简单介绍下:

  • Disruptor它是一个开源的并发框架,并获得2011 Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单~
  • 日志框架Log4j2 异步模式采用了Disruptor来处理
  • 局限呢,他就是个内存队列,也就是说无法支撑分布式场景。

简单使用

数据传输对象

@Data
public class EventData {
    private Long value;
}

消费者

public class EventConsumer implements WorkHandler<EventData> {

    /**
     * 消费回调
     * @param eventData
     * @throws Exception
     */
    @Override
    public void onEvent(EventData eventData) throws Exception {
        Thread.sleep(5000);
        System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());
    }
}

生产者

public class EventProducer {

    private final RingBuffer<EventData> ringBuffer;

    public EventProducer(RingBuffer<EventData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(Long v){
        // cas展位
        long next = ringBuffer.next();
        try {
            EventData eventData = ringBuffer.get(next);
            eventData.setValue(v);
        } finally {
            // 通知等待的消费者
            System.out.println("EventProducer send success, sequence:"+next);
            ringBuffer.publish(next);
        }
    }
}

测试类

public class DisruptorTest {

    public static void main(String[] args) {
        // 2的n次方
        int bufferSize = 8;

        Disruptor<EventData> disruptor = new Disruptor<EventData>(
                () -> new EventData(), // 事件工厂
                bufferSize,            // 环形数组大小
                Executors.defaultThreadFactory(),       // 线程池工厂
                ProducerType.MULTI,    // 支持多事件发布者
                new BlockingWaitStrategy());    // 等待策略

        // 设置消费者
        disruptor.handleEventsWithWorkerPool(
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer(),
                new EventConsumer());

        disruptor.start();

        RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer();
        EventProducer eventProducer = new EventProducer(ringBuffer);
        long i  = 0;
        for(;;){
            i++;
            eventProducer.sendData(i);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

核心组件

基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢?

RingBuffer: 环形数组,底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作

Sequencer: 核心管家

  • 定义生产同步的实现:SingleProducerSequencer单生产、MultiProducerSequencer多生产

  • 当前写的进度Sequence cursor

  • 所有消费者进度的数组Sequence[] gatingSequences

  • MultiProducerSequencer可用区availableBuffer【利用空间换取查询效率】

Sequence: 本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger; 还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。

workProcessor: 处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler

EventHandler: 负责业务逻辑的handler,自己实现。

WaitStrategy: 消费者 如何等待 事件的策略,定义了如下策略

  • leepingWaitStrategy:自旋 + yield + sleep

  • BlockingWaitStrategy:加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的

  • YieldingWaitStrategy:自旋 + yield + 自旋

  • BusySpinWaitStrategy:自旋,减少线程之前切换

  • PhasedBackoffWaitStrategy:自旋 + yield + 自定义策略

带着问题来解析代码?

1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】

每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。

接下来我们看下多生产类MultiProducerSequencer中next方法【获取生产序号】

// 消费者上一次消费的最小序号 // 后续第二点会讲到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 当前进度的序号
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消费者的序号 //后续第二点会讲到
protected volatile Sequence[] gatingSequences = new Sequence[0];

 public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do
        {
            // 当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值
            current = cursor.get();
            // 要申请的序号空间:最大序列号
            next = current + n;
  
            long wrapPoint = next - bufferSize;
            // 消费者最小序列号
            long cachedGatingSequence = gatingSequenceCache.get();
            // 大于一圈 || 最小消费序列号>当前进度
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                // 说明大于1圈,并没有多余空间可以申请
                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
                // 更新最小值到Sequence的value中
                gatingSequenceCache.set(gatingSequence);
            }
            // CAS成功后更新当前Sequence的value
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);
        return next;
    }

2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】

从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号-buffersize 要小于/等于 最小消费的序列号】的时候, 才能申请到当前写的序号

public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
    return createWorkerPool(new Sequence[0], workHandlers);
}


EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);


    consumerRepository.add(workerPool, sequenceBarrier);

    final Sequence[] workerSequences = workerPool.getWorkerSequences();

    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

    return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}

    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        // 消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequences
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】

这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer使用了int 数组 【availableBuffer】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer中当前序列号置为1标识可以读取。

如此消费者可以读取的的最大序号就是我们availableBuffer中第一个不可用序号-1。

初始化availableBuffer流程

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    // 初始化可用数组
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
// 初始化默认availableBuffer为-1
private void initialiseAvailableBuffer()
{
    for (int i = availableBuffer.length - 1; i != 0; i--)
    {
        setAvailableBufferValue(i, -1);
    }

    setAvailableBufferValue(0, -1);
}

// 生产者成功生产事件将可用区数组置为1
public void publish(final long sequence)
{
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}

private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

消费者消费流程

WorkProcessor类中消费run方法
public void run()
    {
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // 先通过cas获取消费事件的占有权
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 数据就绪,可以消费
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    // 触发回调函数
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    // 获取可以被读取的下标
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
        // ....省略
        }

        notifyShutdown();

        running.set(false);
    }
    
    
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
        // 这个值获取的current write 下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 通过availableBuffer筛选出第一个不可用序号 -1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    
    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        // 从current read下标开始, 循环至 current write,如果碰到availableBuffer 为-1 直接返回
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
        {
            if (!isAvailable(sequence))
            {
                return sequence - 1;
            }
        }

        return availableSequence;
    }

解决伪共享问题

什么是伪共享问题呢?

为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。

伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cache line中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cache line中,每次却要去内存中拉取。

Disruptor是如何解决的呢?

在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存

回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?

  • 缓存行填充,避免缓存频繁失效。【java8中也引入@sun.misc.Contended注解来避免伪共享】
  • 无锁竞争:通过CAS 【二阶段提交】
  • 环形数组:数据都是覆盖,避免GC
  • 底层更多的使用位运算来提升效率

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/74127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

R语言建立和可视化混合效应模型mixed effect model

全文下载链接&#xff1a;http://tecdat.cn/?p20631我们已经学习了如何处理混合效应模型。本文的重点是如何建立和_可视化_ 混合效应模型的结果&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。相关视频设置本文使用数据集&#xff0c;用于探索草食动物种群对…

Linux Capabilities

Linux Capabilities 入门教程&#xff1a;基础实战篇 为了对 root 权限进行更细粒度的控制&#xff0c;实现按需授权&#xff0c;Linux 引入了另一种机制叫capabilities。Capabilites 作为线程&#xff08;Linux 并不真正区分进程和线程&#xff09;的属性存在&#xff0c;每个…

极客时间Kafka - 06 Kafka 消费者组 Consumer Group 到底是什么?

文章目录1. 消费者组到底是什么&#xff1f;2. Consumer Group下的Consumer实例个数3. 消费者位移Offset4. 重平衡1. 消费者组到底是什么&#xff1f; 消费者组&#xff0c;即 Consumer Group&#xff0c;应该算是 Kafka 比较有亮点的设计了。那么何谓 Consumer Group 呢&…

JAVA SCRIPT设计模式--行为型--设计模式之State状态者模式(20)

JAVA SCRIPT设计模式是本人根据GOF的设计模式写的博客记录。使用JAVA SCRIPT语言来实现主体功能&#xff0c;所以不可能像C&#xff0c;JAVA等面向对象语言一样严谨&#xff0c;大部分程序都附上了JAVA SCRIPT代码&#xff0c;代码只是实现了设计模式的主体功能&#xff0c;不代…

ToDesk企业版使用测试:破解企业远程办公难题,更安全更高效

❤️作者主页&#xff1a;小虚竹 ❤️作者简介&#xff1a;大家好,我是小虚竹。Java领域优质创作者&#x1f3c6;&#xff0c;CSDN博客专家&#x1f3c6;&#xff0c;华为云享专家&#x1f3c6;&#xff0c;掘金年度人气作者&#x1f3c6;&#xff0c;阿里云专家博主&#x1f3…

Phoenix安装部署

目录官网地址Phoenix 部署连接二次连接&#xff0c;连接失败解决官网地址 link Phoenix 部署 上传并解压 tar 包 更名 复制 server 包并拷贝到各个节点的 hbase/lib 配置环境变量 sudo vim /etc/profile.d/my_env.sh#phoenix export PHOENIX_HOME/opt/module/phoenix e…

抗病毒面料之外,安奈儿价值内核彰显

伴随着防控措施的不断优化&#xff0c;消费板块重回资本视野&#xff0c;其中童装巨头安奈儿因将推出“抗病毒抗菌面料”备受关注&#xff0c;14天收获10个涨停板。 目前安奈儿凭借抗病毒面料吸引了无数资本的目光&#xff0c;但这也是一把双刃剑。虽然抗病毒面料为安奈儿带来了…

数字化棋牌室 | 会员管理预约系统 | 棋牌室小程序

棋牌室在城市与农村都是部分老年人与年轻人的经常去的娱乐场所&#xff0c;以前这些场所里总是挤满了人&#xff0c;但现在越来越多的棋牌室即使环境装修的漂亮、设备高端完善等依然面对流量难题及管理难题&#xff0c;同时由于棋牌室具有社区属性&#xff0c;因此也有不少商家…

ARM微控制器MK24FN1M0VDC12、MKV10Z128VLH7低功耗MCU资料

MK24FN1M0VDC12 IC MCU 32BIT 1MB FLASH 121XFBGA 说明&#xff1a;Kinetis K2x 32位微控制器是低功耗mcu&#xff0c;通过智能片上集成大大节省了BOM。这些mcu基于ArmCortex-M4核心&#xff0c;提供完整和可选的高速USB 2.0 on - on - go (OTG)&#xff0c;包括无晶体设备功能…

干货 | 数字经济创新创业——数据是数字经济的基础

下文整理自清华大学大数据能力提升项目能力提升模块课程“Innovation & Entrepreneurship for Digital Economy”&#xff08;数字经济创新创业课程)的精彩内容。主讲嘉宾&#xff1a;Kris Singh: CEO at SRII, Palo Alto, CaliforniaVisiting Professor of Tsinghua Unive…

RTSP 协议漫谈,揭秘 RTSP 协议内幕

RTSP&#xff08;Real Time Streaming Protocol&#xff09;实时流传输协议&#xff0c;定义在 RFC2326&#xff0c;是 TCP/IP 协议体系中的一个应用层协议&#xff0c;由哥伦比亚大学、网景和 RealNetworks 公司提交的 IETF RFC 标准。该协议定义了一对多应用程序如何有效地通…

Linux用户管理详解

Linux用户管理详解 前言 Linux用户即Linux的使用者&#xff0c;是指使用Linux系统或服务的人员&#xff0c;通常用户对应拥有一个用户账号&#xff0c;并对用户名识别。正常登录Linux系统&#xff0c;本质是登录系统&#xff0c;但是Linux支持同一时间多个用户同时登陆&#x…

JSP ssh服装定制电子商务系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP ssh服装定制电子商务系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采 用B/S模式开发。开发环境为TOMCA…

大一学生《Web编程基础》期末网页制作 HTML+CSS+JavaScript 网页设计实例 企业网站制作

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 ⚽精彩专栏推荐&#x1…

【Docker】第 1 章 Docker概述

1.1 Docker是什么 使用最广泛的开源容器引擎一种操作系统级的虚拟化技术依赖于Linux内核特性&#xff1a;Namespace&#xff08;资源隔离&#xff09;和Cgroups&#xff08;资源限制&#xff09; 一个简单的应用程序打包工具 1.2 Docker设计目标 提供简单的应用程序打包工具开…

传奇架设需要准备以下条件

传奇架设其实很简单 很多网友非常爱玩这款游戏&#xff0c;可能还有朋友不知道怎么架设这款游戏 今天特意写篇传奇架设教程&#xff0c;希望大家都能打造出真正属于自己的传奇 首先传奇架设需要准备以下几个软件 准备工具&#xff1a; 1、传奇服务端&#xff08;版本&#…

select组件切换tags时,联动的select组件内容清空

前言 记录在项目开发中遇到问题的解决方案&#xff0c;方便以后遇到快速解决&#xff01; 问题描述 在该react hooks 页面中&#xff0c;图纸计划附件表格是一个子组件。其中 【单体】和【专业】两个下拉select选择框&#xff0c;数据来源依赖于【厂区】。 后端给的接口&#…

9微电网两阶段鲁棒优化经济调度方法(MATLAB程序)

联系2645521500 复现文章&#xff1a; 微电网两阶段鲁棒优化经济调度方法——刘一欣&#xff08;中国电机工程学报&#xff09; 主要内容&#xff1a; 针对微电网内可再生能源和负荷的不确定性&#xff0c;建立了min-max-min 结构的两阶段鲁棒优化模型&#xff0c;可得到最…

SpringCloud微服务网关gateway

SpringCloud微服务网关gateway 网关简介 大家都都知道在微服务架构中&#xff0c;一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用这么多的微服务呢&#xff1f; 如果没有网关的存在&#xff0c;我们只能在客户端记录每个微服务的地址&#xff0c;然后分别去用…

word电子版签名

word电子版签名 问题 word如何实现电子版签名 解决方案一 1 在纸上使用签字笔签名并进行拍照 2 对图片进行使用电子扫描 对于图片进行使用电子扫描&#xff0c; 可选择的app与微信小程序较多&#xff0c;可自行选择&#xff0c;对于app&#xff0c; 笔者推荐全能扫描王&a…