【源码解析】Disruptor框架的源码解析

news2025/1/19 23:26:26

Disruptor初始化

初始化Disruptor实例

        //单线程模式,获取额外的性能  
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

创建RingBuffer实例

    @Deprecated
    public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
    }

    private Disruptor(RingBuffer<T> ringBuffer, Executor executor) {
        this.consumerRepository = new ConsumerRepository();
        this.started = new AtomicBoolean(false);
        this.exceptionHandler = new ExceptionHandlerWrapper();
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }

RingBuffer#create,根据producerType来创建不同的Producer

    public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        switch(producerType) {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }

RingBuffer#createSingleProducer()。创建SingleProducerSequencer实例和RingBuffer实例

    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer(factory, sequencer);
    }

创建SingleProducerSequencer实例

AbstractSequencerSingleProducerSequencer的父类,初始化bufferSizewaitStrategy

    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
        if (bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        } else if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        } else {
            this.bufferSize = bufferSize;
            this.waitStrategy = waitStrategy;
        }
    }

创建SingleProducerSequencer实例时还初始化了一个成员变量cursor

    protected final Sequence cursor = new Sequence(-1L);

cursor赋值了一个Sequence实例对象,Sequence是标识RingBuffer环形数组的下标,同时生产者和消费者也会同时维护各自的Sequence。最重要的是,Sequence通过填充CPU缓存行避免了伪共享带来的性能损耗

public class Sequence extends RhsPadding {
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private static final long VALUE_OFFSET;

    public Sequence() {
        this(-1L);
    }

    public Sequence(long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }
}

class RhsPadding extends Value {
    protected long p9;
    protected long p10;
    protected long p11;
    protected long p12;
    protected long p13;
    protected long p14;
    protected long p15;

    RhsPadding() {
    }
}

class Value extends LhsPadding {
    protected volatile long value;

    Value() {
    }
}

class LhsPadding {
    protected long p1;
    protected long p2;
    protected long p3;
    protected long p4;
    protected long p5;
    protected long p6;
    protected long p7;

    LhsPadding() {
    }
}

创建RingBuffer实例

RingBufferFieldsRingBuffer父类,在创建RingBuffer实例时,会为RingBuffer的环形数组提前填充Event对象,即内存池机制

    RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (this.bufferSize < 1) {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        } else if (Integer.bitCount(this.bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        } else {
            this.indexMask = (long)(this.bufferSize - 1);
            this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
            this.fill(eventFactory);
        }
    }

    private void fill(EventFactory<E> eventFactory) {
        for(int i = 0; i < this.bufferSize; ++i) {
            this.entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }

    }

获取Unsafe对象。

    public static Unsafe getUnsafe() {
        return THE_UNSAFE;
    }

    static {
        try {
            PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() {
                public Unsafe run() throws Exception {
                    Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
                    theUnsafe.setAccessible(true);
                    return (Unsafe)theUnsafe.get((Object)null);
                }
            };
            THE_UNSAFE = (Unsafe)AccessController.doPrivileged(action);
        } catch (Exception var1) {
            throw new RuntimeException("Unable to load unsafe", var1);
        }
    }

指定EventHandler

        //设置事件业务处理器---消费者  
        disruptor.handleEventsWith(new HelloEventHandler());

Disruptor#handleEventsWith()

  • 有多少个eventHandlers就创建多少个BatchEventProcessor实例(消费者),BatchEventProcessor消费者其实就是一个实现Runnable接口的线程实例;
  • 同一批次的每个BatchEventProcessor实例共用同一个SequenceBarrier实例
    public EventHandlerGroup<T> handleEventsWith(EventHandler<? super T>... handlers) {
        return this.createEventProcessors(new Sequence[0], handlers);
    }
	
	EventHandlerGroup<T> createEventProcessors(Sequence[] barrierSequences, EventHandler<? super T>[] eventHandlers) {
        this.checkNotStarted();
        Sequence[] processorSequences = new Sequence[eventHandlers.length];
        SequenceBarrier barrier = this.ringBuffer.newBarrier(barrierSequences);
        int i = 0;

        for(int eventHandlersLength = eventHandlers.length; i < eventHandlersLength; ++i) {
            EventHandler<? super T> eventHandler = eventHandlers[i];
            BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor(this.ringBuffer, barrier, eventHandler);
            if (this.exceptionHandler != null) {
                batchEventProcessor.setExceptionHandler(this.exceptionHandler);
            }

            this.consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        if (processorSequences.length > 0) {
            this.consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }

        return new EventHandlerGroup(this, this.consumerRepository, processorSequences);
    }

AbstractSequencer#newBarrier,序列器创建栅栏。

    @Override
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }

创建栅栏,将生产者序列器和cursorSequence放在一起。

    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }

Disruptor#after()

    public EventHandlerGroup<T> after(final EventHandler<T>... handlers)
    {
        final Sequence[] sequences = new Sequence[handlers.length];
        for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
        {
            sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
        }

        return new EventHandlerGroup<T>(this, consumerRepository, sequences);
    }

启动Disruptor实例

dsl.Disruptor#start

    public RingBuffer<T> start()
    {
        final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
        ringBuffer.addGatingSequences(gatingSequences);

        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

EventProcessorInfo#start,用线程池执行封装的eventprocessor

    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);
    }

生产者生产数据

RingBuffer#next()

    public long next()
    {
        return sequencer.next();
    }

SingleProducerSequencer#next(),单生产者获取sequence

    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;

        long nextSequence = nextValue + n;
        long wrapPoint = nextSequence - bufferSize;
        long cachedGatingSequence = this.cachedValue;

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            long minSequence;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

RingBuffer#publish()

    @Override
    public void publish(long sequence)
    {
        sequencer.publish(sequence);
    }

SingleProducerSequencer#publish(long),指定cursor,进行唤醒线程。

    public void publish(long sequence)
    {
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

消费者消费数据

BatchEventProcessordataProviderringBuffersequenceBarrierProcessingSequenceBarrier

    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }

BatchEventProcessor#run,判断是否需要阻塞,使用事件处理器处理数据。

    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }

ProcessingSequenceBarrier#waitFor,调用等待策略。

    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

等待策略

BlockingWaitStrategy

public final class BlockingWaitStrategy implements WaitStrategy
{
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();
        }
        finally
        {
            lock.unlock();
        }
    }
}

协作者模式

Disruptor广播模式跟MQ的广播模式功能是一样的即生产者生产的消息会广播到每个消费者;不过相信大家在使用MQ的过程中更多使用的是MQ的协作者模式即同一个消费者组的消费者共同消费生产者生产的消息,同样,Disruptor也一样提供了协作者模式。

		 //4 构建多消费者协作模式工作池
        WorkerPool<LongEvent> workerPool = new WorkerPool<LongEvent>(
                ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                workHandlers);
 
        //5 设置GatingSequences,创建一个生产者与消费者组之间的屏障,避免生产者生产速度赶上消费最慢的消费者
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/539869.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Midjourney|文心一格prompt教程[Text Prompt(上篇)]:品牌log、App、徽章、插画、头像场景生成,各种风格选择:科技风、运动风

Midjourney|文心一格prompt教程[Text Prompt&#xff08;上篇&#xff09;]&#xff1a;品牌log、App、徽章、插画、头像场景生成&#xff0c;各种风格选择&#xff1a;科技风、运动风 1.撰写 Text Prompt 注意事项 Midjourney 跟 ChatGPT 在 prompt 的使用上有很多不一样的地…

怎么做好媒体邀约

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好 要做好媒体邀约&#xff0c;以下是一些关键步骤和建议&#xff1a; 1. 制定媒体邀约方案&#xff1a;首先&#xff0c;确定您希望邀约的目标媒体。了解他们的受众、内容定位和报道风格&#xff0c;确保与您的品牌和目标受众…

ChatGPT 的工作原理:深入探究

本文首发于微信公众号&#xff1a;大迁世界, 我的微信&#xff1a;qq449245884&#xff0c;我会第一时间和你分享前端行业趋势&#xff0c;学习途径等等。 更多开源作品请看 GitHub https://github.com/qq449245884/xiaozhi &#xff0c;包含一线大厂面试完整考点、资料以及我的…

opencv实践项目-图片拼接之缝合线算法

目录 1. stitcher_detail执行过程2. 源码3. stitching_detail 程序接口介绍4. 执行5. 结果图 1. stitcher_detail执行过程 stitcher_detail是opencv官网提供的一个用于多福图像拼接的Demo&#xff0c;其主要过程如下&#xff1a; 1.输入待拼接图像集合&#xff1b;2.分别对每幅…

Jetson Nano调试记录:机电设备控制

边缘应用中,机电控制是一项非常重要的能力。 当我们的智能设备在远端环境中,根据所识别的状况变化去执行应对措施,更大程度度地降低对人为操作的依赖,这些都是能产生更大经济效益的应用,包括无人驾驶车、自动机械手臂等等。 实际工业应用场景中的控制元件是非常多样化的,…

惠普暗影精灵5 super 873-068rcn怎样用u盘重装系统win10

当我们的戴尔电脑出现问题的时候&#xff0c;无法进入系统要怎么重装系统win10修复呢?比较常用的就是借助u盘重装系统win10&#xff0c;需要用到装机工具。下面就给大家详细介绍下戴尔电脑怎样用u盘重装系统win10教程。 工具/原料&#xff1a; 系统版本&#xff1a;windows1…

LSTM-理解 Part-3(LSTM Networks)

之前博客中有涉及前两部分的介绍&#xff1a; 第一部分LSTM-理解 Part-1&#xff08;RNN&#xff1a;循环神经网络&#xff09; 第二部分LSTM-理解 Part-2&#xff08;RNN的局限性&#xff09; 这是其中的第三部分&#xff0c;讲解 LSTM Networks。 LSTM Networks 长短期记忆网…

【Python MySQL】零基础也能轻松掌握的学习路线与参考资料

Python是一种广泛使用的编程语言&#xff0c;MySQL是一个流行的关系数据库管理系统。学习Python和MySQL可以帮助开发人员更有效地处理数据&#xff0c;并构建可扩展和强大的Web应用程序。本文将介绍Python MySQL学习路线&#xff0c;参考资料和优秀实践。 Python MySQL学习路线…

美的智家、海尔智家,吹响新一轮AI竞赛号角

ChatGPT大行其道&#xff0c;各行各业迫不及待披上了AI大模型的“盔甲”&#xff0c;有的企业自研AI大模型&#xff0c;有的企业牵手头部科技企业&#xff0c;寻求智能产品价值的最大化&#xff0c;智能家电行业也不例外。 在国内&#xff0c;百度AI大模型文心一言一经推出就吸…

卷绕模型介绍

卷绕模型是收放卷应用的基础知识,这篇博客帮大家整理分析,有关收放卷的其它相关基础知识请参看专栏相关文章。和这篇文章相关联的知识点大家可以参看下面的博客 体积法计算卷径详细介绍 卷径计算详解(通过卷绕的膜长和膜厚进行计算)_RXXW_Dor的博客-CSDN博客有关卷绕+张力控…

LoRA:大模型的低秩自适应微调模型

对于大型模型来说&#xff0c;重新训练所有模型参数的全微调变得不可行。比如GPT-3 175B&#xff0c;模型包含175B个参数吗&#xff0c;无论是微调训练和模型部署&#xff0c;都是不可能的事。所以Microsoft 提出了低秩自适应(Low-Rank Adaptation, LoRA)&#xff0c;它冻结了预…

企业级体验:未来体验管理的价值与趋势

我从事企业级体验相关领域的工作已十六载有余&#xff0c;曾经就职的企业既有阿里巴巴、腾讯这样的互联网“大厂”&#xff0c;也有顺丰、龙湖这样的线下“传统”企业。在这些企业中&#xff0c;我所工作的场景横跨了软件、电商、互联网、物流、零售、地产、金融等诸多业务领域…

11.计算机基础-计算机网络面试题—基础知识

本文目录如下&#xff1a; 计算机基础-计算机网络 面试题一、基础知识简述 TCP 和 UDP 的区别&#xff1f;http 与 https的区别?Session 和 Cookie 有什么区别&#xff1f;详细描述一下 HTTP 访问一个网站的过程&#xff1f;https 是如何实现加密的&#xff1f;URL是什么&…

Linux下网络编程(3)——socket编程实战,如何构建一个服务器和客户端连接

经过前几篇的介绍&#xff0c;本文我们将进行编程实战&#xff0c;实现一个简单地服务器和客户端应用程序。 编写服务器程序 编写服务器应用程序的流程如下&#xff1a; ①、调用 socket()函数打开套接字&#xff0c;得到套接字描述符&#xff1b; ②、调用 bind()函数将套接字…

Deep Supervised Dual Cycle Adversarial Network for Cross-Modal Retrieval 论文阅读笔记

​# 1. 论文信息 论文名称Deep Supervised Dual Cycle Adversarial Network for Cross-Modal Retrieval作者Lei Liao 中山大学会议/出版社IEEE Transactions on Circuits and Systems for Video Technologypdf&#x1f4c4;在线pdf代码&#x1f4bb;无代码 本文是基于公共空间…

mov转换为mp4格式怎么转,多方法教程

mov转换为mp4格式怎么转&#xff1f;如果你连mov都不了解&#xff0c;那就更别说将其进行转换了。其实使用过苹果手机的人应该接触的很多&#xff0c;但是我们时常不关注这个视频格式。MOV是一种音频和视频文件格式&#xff0c;它在苹果手机上使用。它不兼容安卓或Windows电脑&…

cookie、session、JWT(Json Web Token) 的区别?

cookie、session、JWT(Json Web Token) 的区别&#xff1f; 答&#xff1a; 要从http的无状态说起&#xff0c;http是无状态的&#xff0c;也就是如果你第一次访问服务器&#xff0c;之后再次访问的时候&#xff0c;服务器是不会意识到你再次来进行访问的。不想让已经登录的用…

敏捷ACP 常用关键词整理 敏捷ACP 常用知识点整理

敏捷ACP 常用关键词整理 敏捷ACP 常用知识点整理 一、MoSCoW 1、MoSCoW &#xff1a; 读作"莫斯科"&#xff0c;适用于故事优先级的排序&#xff0c;首次出现在 3-13敏捷产品实践&#xff1a;产品待办事项列表的排序 &#xff1b; 基于价值的分析的一种技术 &#…

mac python3.9安装pyqt5、qt5、pyqt5-tools

一 pip安装 转义安装 # 一条代码就可以搞定没错&#xff0c;使用的是Rosetta2 x86_64模式安装的 arch -x86_64 python3 -m pip install PyQt5arch -x86_64 python3 -m pip install pyqt5-tools二 brew安装 arm版 以下pip命令自行更具自己环境选择pip或pip3 在安装pyqt前必须先…

【C++】set和map的底层AVL树的实现

AVL树 文章目录 前言一、AVL树的实现总结 前言 上一篇文章对 map/multimap/set/multiset 进行了简单的介绍&#xff0c;在其文档介绍中发现&#xff0c;这几个容器有个共同点是&#xff1a;其底层都是按照二叉搜索树来实现的 &#xff0c;但是二叉搜索树有其自身的缺陷&#xf…