Java集合之Disruptor 介绍

news2024/12/23 14:13:17

文章目录

  • 1 Disruptor
    • 1.1 简介
      • 1.1.1 定义
      • 1.1.2 Java中线程安全队列
      • 1.1.3 Disruptor 核心概念
    • 1.2 操作
      • 1.2.1 坐标依赖
      • 1.2.2 创建事件
      • 1.2.3 创建事件工厂
      • 1.2.4 创建处理事件Handler--消费者
      • 1.2.5 初始化 Disruptor
        • 1.2.5.1 静态类
        • 1.2.5.2 配置类
        • 1.2.5.3 Disruptor 构造函数讲解
      • 1.2.6 发布事件
        • 1.2.6.1 main方法测试
        • 1.2.6.2 使用配置方式

1 Disruptor

1.1 简介

1.1.1 定义

Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke’s Choice Awards(Duke 选择大奖)。

Disruptor 提供的功能类似于 KafkaRocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存),Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题,Disruptor 有个最大的优点就是快

Disruptor被设计用于在生产者消费者producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟
Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到 Disruptor,它可以带来显著的性能提升。其实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在并发、缓冲区、生产者—消费者模型、事务处理这些元素的程序来说,Disruptor 提出了一种大幅提升性能(TPS)的方案。

github 地址

Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

1.1.2 Java中线程安全队列

JDK 中常见的线程安全的队列如下:

队列名字是否有界
ArrayBlockingQueue加锁(ReentrantLock)有界
LinkedBlockingQueue加锁(ReentrantLock)有界
LinkedTransferQueue无锁(CAS)无界
ConcurrentLinkedQueue无锁(CAS)无界

从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。
Disruptor 就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。

1.1.3 Disruptor 核心概念

Disruptor 核心概念:

  • Event:可以把 Event 理解为存放在队列中等待消费的消息对象。
    Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
  • EventFactory:事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。
  • EventHandlerEvent 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。
    Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现
  • EventProcessorEventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)
  • Disruptor:事件的生产和消费需要用到 Disruptor 对象。
  • RingBufferRingBuffer(环形数组)用于保存事件
    如其名,环形的缓冲区。曾经 RingBufferDisruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
  • WaitStrategy:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
  • Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型
  • ProducerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似)。
  • SequencerSequencerDisruptor 的真正核心。此接口有两个实现类 - SingleProducerSequencerMultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
  • Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。
    虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的 CPU 缓存伪共享(Flase Sharing)问题。(注:这是 Disruptor 实现高性能的关键点之一)
  • Sequence Barrier:用于保持对 RingBuffermain published SequenceConsumer 依赖的其它 ConsumerSequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

在这里插入图片描述

1.2 操作

1.2.1 坐标依赖

pom.xml

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Gradle:

implementation 'com.lmax:disruptor:3.4.4'

1.2.2 创建事件

我们先来定义一个代表日志事件的类:LogEvent 。

事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message。

@Data
public class LogEvent {
    private String message;
}

我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message

1.2.3 创建事件工厂

创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。
LogEventFactory 继承 EventFactory 接口并实现了 newInstance() 方法 。

public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

1.2.4 创建处理事件Handler–消费者

创建一个用于处理后续发布的事件的类:LogEventHandler 。
LogEventHandler 继承 EventHandler 接口并实现了 onEvent() 方法 。

public class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(logEvent.getMessage());
    }
}

EventHandler 接口的 onEvent() 方法共有 3 个参数:

  • event:待消费/处理的事件
  • sequence:正在处理的事件在环形数组(RingBuffer)中的位置
  • endOfBatch:表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)

1.2.5 初始化 Disruptor

1.2.5.1 静态类

我们这里定义一个方法用于获取 Disruptor 对象

private static Disruptor<LogEvent> getLogEventDisruptor() {
    // 创建 LogEvent 的工厂
    LogEventFactory logEventFactory = new LogEventFactory();
    // Disruptor 的 RingBuffer 缓存大小
    int bufferSize = 1024 * 1024;
    // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
    //实例化 Disruptor
    return new Disruptor<>(
            logEventFactory,
            bufferSize,
            threadFactory,
            // 单生产者
            ProducerType.SINGLE,
            // 阻塞等待策略
            new BlockingWaitStrategy());
}

1.2.5.2 配置类

使用配置类的方式

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<LogEvent> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };

        //指定事件工厂
        LogEventFactory factory = new LogEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
        	 bufferSize, 
        	 threadFactory,
        	 ProducerType.SINGLE, 
        	 new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        //Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。
       
        disruptor.handleEventsWith(new LogEventHandler ());
      // Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
       //就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。
       //disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
       
        // 启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

1.2.5.3 Disruptor 构造函数讲解

Disruptor 的推荐使用的构造函数如下:

public class Disruptor<T> {
  public Disruptor(
          final EventFactory<T> eventFactory,
          final int ringBufferSize,
          final ThreadFactory threadFactory,
          final ProducerType producerType,
          final WaitStrategy waitStrategy)
  {
      this(
          RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
          new BasicExecutor(threadFactory));
  }

......
}

我们需要传递 5 个参数:

  • eventFactory:我们自定义的事件工厂。
  • ringBufferSize:指定 RingBuffer 的容量大小。
  • threadFactory:自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。
  • producerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。
  • waitStrategy:等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。

ProducerType 的源码如下,它是一个包含两个变量的枚举类型

  • SINGLE:单个事件发布者模式,不需要保证线程安全。
  • MULTI:多个事件发布者模式,基于 CAS 来保证线程安全。

WaitStrategy (等待策略)接口的实现类中只有两个方法:

  • waitFor() :等待新事件的到来。
  • signalAllWhenBlocking():唤醒所有等待的消费者。
public interface WaitStrategy
{
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    void signalAllWhenBlocking();
}

WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。
在这里插入图片描述

除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。

使用这个构造函数创建的 Disruptor 对象会默认使用 ProducerType.MULTI(多个事件发布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

1.2.6 发布事件

1.2.6.1 main方法测试

//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
    // 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
    long sequence = ringBuffer.next();
    try {
        LogEvent logEvent = ringBuffer.get(sequence);
        // 初始化 Event,对其赋值
        logEvent.setMessage("这是第%d条日志消息".formatted(i));
    } finally {
        // 发布事件
        ringBuffer.publish(sequence);
    }
}
// 关闭 Disruptor
disruptor.shutdown();

1.2.6.2 使用配置方式

public interface DisruptorMqService {

    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<LogEvent> messageModelRingBuffer;

    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/715756.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp仿浙北汇生活微信小程序

最近给公司写了一个内部微信小程序&#xff0c;功能比较简单&#xff0c;之前是用微信小程序原生写的&#xff0c;一遍看文档一边写&#xff0c;js&#xff0c;wxml&#xff0c;wxcc&#xff0c;json分在不同文件的写法很不习惯&#xff0c;于是花了两天用uniapp重写了一遍&…

TextMining day1 电力设备运维过程中的短文本挖掘框架

电力设备运维过程中的短文本挖掘框架 III. 短文本挖掘框架的具体设计A. 预处理模块的具体设计B. 数据清洗模块的具体设计C. 表示模块的具体设计D. 数据分析模块的具体设计 IV. 案例研究A. 基于文本分类的缺陷程度判断B. 基于文本检索的缺陷处理决策 V. 结论 预处理 首先&#x…

一个光模块可以带动多少户

随着科技的快速发展&#xff0c;光模块的应用场景逐渐扩大&#xff0c;数据中心、人工智能AI的创新使我们的生活日新月异。今天我们就来看看一个小小的光模块究竟蕴藏着多大的能量&#xff01; 一、影响光模块带动户数的因素 光模块是一种实现光电转换和电光转换功能的光电子…

android Surface(1, 2)

android Surface(1, 2) android的Surface相关内容从底层依次往上分别是&#xff1a; 1.frameBuffer&#xff0c;简称fb&#xff0c;对于同一个android系统&#xff0c;可以同时存在多个frameBuffer&#xff0c;本机是fb0&#xff0c;依次外接时&#xff0c;fb1, fb2, ……fbn…

LeetCode·每日一题·445. 两数相加 II·模拟

作者&#xff1a;小迅 链接&#xff1a;https://leetcode.cn/problems/add-two-numbers-ii/solutions/2328613/mo-ni-zhu-shi-chao-ji-xiang-xi-by-xun-ge-67qx/ 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 著作权归作者所有。商业转载请联系作者获得授权&#xff…

2023.6.26-7.2 AI行业周刊(第152期):从一个热门视频,得到的人生发展感悟

上周五去上海参加2023年MWC&#xff08;世界移动通讯大会&#xff09;&#xff0c;在回无锡的路上&#xff0c;无意中刷到一个已关注博主的视频。 这个博主是2021年的时候&#xff0c;刚发第二个视频的时候&#xff0c;就一直在关注的。 从分享他从公务员辞职的经历&#xff…

【Web3】认识Web3

Web3是一种用于描述下一代互联网的概念 它指在构建一个去中心化 用户控制和加密安全的网络环境。 Web3的目标是将权利和数据掌握回归到用户手中 通过采用分布式技术和加密货币的支持 实现更加开放 公开和透明的互联网 Web的主要特点 去中化&#xff1a;Web3的核销理念是去中心…

静态时序分析: update io latency

往期文章链接: 静态时序分析: 虚拟时钟与I/O延迟约束 静态时序分析: 时钟延时(clock latency) 在CTS之前,clock是ideal的,in2reg与reg2out的path由于reg的clock network delay为0,所以时序比较容易收敛,在CTS之后,由于reg的clock network delay有了真实值(propagated…

Spring Boot 中的滚动部署是什么,如何使用

Spring Boot 中的滚动部署是什么&#xff0c;如何使用 简介 在开发和部署应用程序时&#xff0c;我们希望最小化中断&#xff0c;以确保应用程序始终可用。滚动部署是一种部署应用程序的方法&#xff0c;可以逐步将新版本部署到生产环境中&#xff0c;同时保持应用程序的可用…

Linux 6.5增加对高通开源GPU Adreno 690的支持

导读即将推出的Linux 6.5内核将把对高通Adreno 690 GPU的支持添加到开源的MSM内核图形/显示驱动程序中。A690主要用于骁龙8cx第三代&#xff08;SC8280XP&#xff09;平台&#xff0c;而联想ThinkPad X13s笔记本电脑和其他硬件也采用了该平台。 新的支持将包含近200行代码&…

基于小程序+云开发制作一个文件传输助手小程序

微信文件传输助手是真人?基于云开发制作一个文件传输助手小程序,你发给ta的小秘密,只有你自己知道。 开发步骤一、创建小程序二、云开发配置环境配置绑定云环境三、页面设计首页详情页底部弹窗四、云数据库概念云函数服务端函数文件上传

Java版事件与委托实现自动创建工厂并热加载

本文已收录于专栏 《Java》 目录 概念说明事件与委托工厂方法热加载 需求介绍代码实现1.整体结构2.工厂方法中已经存在的类工厂接口运算类工厂(其他工厂基本上是一样的)&#xff1a;目前没有加法类的工厂我们后面会添加加法类工厂然后热加载运行运算父类运算子类 3.工厂方法之外…

Java 运行jar包变更配置文件与变量

文章目录 前言实现原理不同环境的配置文件变更配置变量 前言 为实现快速搭建和开发&#xff0c;项目以Springboot框架搭建&#xff0c;springboot搭建的项目可以将项目直接打成jar包并运行&#xff0c;无需自己安装配置Tomcat或者其他服务器&#xff0c;是一种方便快捷的部署方…

PDF怎么转换成Excel?两个实用的方法给你!

如何将PDF文件转换成Excel表格的格式呢&#xff1f;在日常办公中&#xff0c;我们经常会遇到需要将PDF文件转换成Excel表格的情况。由于PDF文件具有稳定的格式特征&#xff0c;很多时候我们下载或接收到的文件都是以PDF格式呈现。那么&#xff0c;当我们需要使用Excel表格格式时…

数据结构--字符串的KMP算法

数据结构–字符串的KMP算法 朴素模式匹配算法&#xff1a; 一旦发现当前这个子串中某个字符不匹配&#xff0c;就只能转而匹配下一个子串(从头开始) 但我们可以知道&#xff1a; 不匹配的字符之前&#xff0c;一定是和模式串一致的 \color{red}不匹配的字符之前&#xff0c;一…

C++中内存拷贝函数memcpy函数使用

函数原型&#xff1a;void *memcpy(void *dest, const void *src, size_t n); 头文件&#xff1a;#include<string.h> 功能&#xff1a; 从源 src 所指的内存地址的起始位置开始拷贝 n 个字节到目标 dest 所指的 内存地址的起始位置中&#xff08;将一个 内存块 的内容…

哪种类型耳机不伤耳朵,分享几款佩戴无需入耳的骨传导耳机

骨传导耳机是目前在运动领域最火热的产品&#xff0c;也是最适合运动的耳机&#xff0c;它的原理是通过颅骨将声音转化为神经冲动&#xff0c;通过内耳传至听觉中枢&#xff0c;因此不会对耳朵造成任何损伤&#xff0c;它同时也可以让耳朵更好地听到周围的声音。能够很好的提高…

100天精通Golang(基础入门篇)——第12天:深入解析Go语言中的集合(Map)及常用函数应用

&#x1f337; 博主 libin9iOak带您 Go to Golang Language.✨ &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &#x1f30a; 《I…

Linux0.11内核源码解析-char_dev.c

目录 概述 串口终端设备 控制台终端 内存 调用接口 概述 char_dev.c文件包括字符设备文件访问函数&#xff0c;主要是有rw_ttyx(),rw_tty(),rw_memory()和rw_char()函数&#xff0c;另外还有一个设备读写函数指针表 串口终端设备 rw_ttyx()是串口终端设备读写函数&#x…

Process Explorer高级使用

工具描述 Process Explorer使用个轻量级的进程管理器&#xff0c;是由Sysinternals出品的免费工具&#xff0c;请猛击这里下载最新版本使用。 以下是官方介绍的翻译&#xff1a; “想知道是那个程序打开了某个文件或者目录么&#xff1f;现在可以找出它了。PorcessExplorer将…