【Java并发】聊聊Disruptor背后高性能的原理

news2025/1/6 19:48:01

为什么需要Disruptor

对于单机生产者消费者来说,JUC本身提供了阻塞队列,ArrayBlockingQueue、LinkedBlockingQueue 等,但是为了保证数据安全,使用了reentrantLock进行加锁操作会影响性能,另一方面,本身如果生产者生产数据过快会导致,内存溢出问题。以及采用数据实现会有伪共享问题。

Disruptor 原理和应用场景

那么Disruptor是如何进行设计的?

  • 环形数组结构
  • 元素位置定位
  • 无锁设计
  • 利用缓存行填充解决伪共享问题
  • 实现了基于事件驱动的生产者消费者模型(观察者模式)

RingBuffer数据结构

在这里插入图片描述
唤醒数组其实就是一个自定义大小的环形数组,有一个序列号 sequence 用以指向下一个可用的元素,需要保证数组的长度必须是2的N次幂,这样就可以通过sequence % length 或者 通过 sequence &(length -1) 可以直接获取到下标位置。其实hashmap也是同样的原理。

那么当环形数组数据满之后,就会覆盖0号位置,具体使用什么策略。提供了4种。

  • BlockingWaitStrategy:不覆盖数据,等待
  • SleepingWaitStrategy
  • YieldingWaitStrategy
  • BusySpinWaitStrategy

这里了解即可,用到的时候 在查

缓存行

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
	
 
 
abstract class RingBufferFields<E> extends RingBufferPad
{
    ......
}
 
 
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;
    ......

刚开始看 发现什么 写一堆这些px 有什么用,其实是充分利用计算机cpu core 的 cache。利用填充行填充。比如我们定义了一个10个长度的long数组,不同一个元素一个元素从内存加载搭配CPU cache的。而是一次性固定加载整个缓存行。

在这里插入图片描述
所以如果只是一个单独的long 变量,可能在多个线程操作下,前后的变量,可能来回的回写和加载,不断的导致INITIAL_CURSOR_VALUE 从内存到 CPU cache,为了防止,所以在前后 添加7个long 类型变量。就会一只在CPU cache中。

无锁的并发-生产者=消费者模型

其实我们可以通过一个数组模拟出一个生产者消费者模型,但是这种方式在单生产者单消费者的情况下,其实没有问题,在多线程情况下,其实没有办法解决,需要加锁的方式,但是加锁的话,其实从一定程度上降低了系统的整体性能,比如说 ArrayBlockingQueue 中 添加元素和获取元素 都是通过lock的方式加锁。

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

那么Disruptor是如何实现的?
对生产者来说: 往队列中添加数据之前,可以先申请可用空闲存储单元,并且是批量申请连续N个单元,申请之后,后续就不用往队列中添加元素,不用加锁。并且申请的存储单元是这个线程独享的。不过申请存储单元的过程需要加锁。

对于消费者来说:也是一次获取多个可读的范围,申请一批连续可读的存储单元。

比如对于 生产者A申请到一组连续的存储单元,3到6,生产者B申请到7到9的存储单元。那么在3到6没有完全写入数据之前,7到9是没有办法读取。这是一个弊端。
在这里插入图片描述

Code

构建数据

/**
 * @author qxlx
 * @date 2024/1/29 22:39
 */
public class Data  {

    private String uid;

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }
}

生产者

public class EventProducer {

    private RingBuffer<Data> ringBuffer;

    public EventProducer(RingBuffer<Data> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void send(long value,String name) {
        long next = ringBuffer.next();
        Data data = ringBuffer.get(next);

        // 写入消息数据
        data.setUid(name);

        //发布事件
        ringBuffer.publish(next);
    }

}

数据工厂

public class OrderEventFactory implements EventFactory<Data> {

    @Override
    public Data newInstance() {
        return new Data();
    }
}

消费者

public class EventHanderConsumer implements EventHandler<Data> {

    @Override
    public void onEvent(Data data, long l, boolean b) throws Exception {
        System.out.println("消费者获取数据"+data.getUid());
    }
}
    public static void main(String[] args) {
       // 构建disruptor对象 
        Disruptor<Data> disruptor = new Disruptor<Data>(
                new OrderEventFactory(),
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy() //等待策略
        );
		
		// 消费者
        disruptor.handleEventsWith((EventHandler<? super Data>) new EventHanderConsumer());
        
        // 启动
        disruptor.start();

		// 生产数据
        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        EventProducer eventProducer = new EventProducer(ringBuffer);

        for (int i = 0; i < 100; i++) {
            eventProducer.send(1,"fix"+i);
        }
    }

应用场景

在实际的应用场景中,比如我们分库分表,user表有8个子表。那么如何保证每个子表生产的uid是固定增长的。一种方式是使用分布式id 雪花算法,另一种方式则可以通过将每个子表的id 每次都+8。比如表1的id是从1 9。表二 从2 10 这样就可以通过固定的步长确定。

好了本篇其实主要简单介绍了其核心原理,具体的大家可以看源代码。

推荐阅读:https://tech.meituan.com/2016/11/18/disruptor.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1431563.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kafka相关内容复习

为什么要用消息队列 解耦 允许你独立的扩展或修改两边的处理过程&#xff0c;只要确保它们遵守同样的接口约束。 可恢复性 系统的一部分组件失效时&#xff0c;不会影响到整个系统。消息队列降低了进程间的耦合度&#xff0c;所以即使一个处理消息的进程挂掉&#xff0c;加入队…

XCTF:3-1[WriteUP]

从题目中获取文件 使用file命令查看文件类型 修改后缀为.rar后进行解压缩 再次使用file命令查询该文件的类型 再次修改后缀为.pcap或者.pcapng 使用wireshark打开&#xff0c;直接搜索flag字样 在多个数据包里发现了flag.rar、flag.txt等文件 尝试使用http导出文件 有一个fl…

小程序<scroll-view/>组件详解及使用指南

目录 引言小程序的流行和重要性scroll-view 组件在小程序中的作用和优势什么是 scroll-view 组件scroll-view 组件的基本概念scroll-view 组件的基本功能scroll-view 组件的属性与用法scroll-view 组件的常用属性参考代码scroll-view 组件的滚动事件scrolltoupper

【开源】SpringBoot框架开发大学计算机课程管理平台

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 实验课程档案模块2.2 实验资源模块2.3 学生实验模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 实验课程档案表3.2.2 实验资源表3.2.3 学生实验表 四、系统展示五、核心代码5.1 一键生成实验5.2 提交实验5.3 批阅实…

ESU毅速丨为什么增材制造广受关注?

随着科技的飞速发展&#xff0c;增材制造3D打印技术逐渐成为制造业的新宠。包括航空航天、汽车、家电、电子等各行业都在积极拥抱3D打印&#xff0c;为什么3D打印能引起制造业广泛关注与应用&#xff1f;它的主要优势有哪些&#xff1f; 首先&#xff0c;3D打印减少浪费。3D打印…

私有化部署的局域网即时通讯工具

在当今快节奏的企业环境中&#xff0c;高效的内部通信成为提高团队协作和工作效率的关键。而企业内部通信软件作为实现实时协作和快速沟通的利器&#xff0c;WorkPlus以其领先的功能和卓越的性能&#xff0c;助力企业打造高效团队沟通协作的新时代。 为何选择WorkPlus作为企业内…

简单说说-docker网络类型

概述 容器网络是指容器之间或非 Docker 工作负载之间连接和通信的能力。容器默认启用网络&#xff0c;并且可以建立传出连接。容器不知道它所连接的网络类型&#xff0c;容器只能看到带有 IP 地址、网关、路由表、DNS 服务和其他网络详细信息的网络接口。也就是说&#xff0c;…

【Docker与微服务】基础篇

1 Docker简介 1.1 docker是什么 1.1.1 问题&#xff1a;为什么会有docker出现&#xff1f; 假定您在开发一个项目&#xff0c;您使用的是一台笔记本电脑而且您的开发环境具有特定的配置。其他开发人员身处的环境配置也各有不同。您正在开发的应用依赖于您当前的配置且还要依…

ElementUI Data:Table 表格

ElementUI安装与使用指南 Table 表格 点击下载learnelementuispringboot项目源码 效果图 el-table.vue&#xff08;Table表格&#xff09;页面效果图 项目里el-table.vue代码 <script> export default {name: el_table,data() {return {tableData: …

基于微信小程序的医保行政执法案件管理系统

本系统设计的是一个医保行政执法的网站&#xff0c;此网站使用户实现了不需出门就可以在手机或电脑前进行网上查询需求信息等。 用户在注册登陆后&#xff0c;在客户端可以实现&#xff1b;案件信息、结案归档、我的等。然而管理员则可以在服务端直接管理&#xff1b;个人中心、…

学成在线:采用XXL-JOB任务调度方案使用FFmpeg处理视频转码业务

分片技术方案 概述 XXL-JOB并不直接提供数据处理的功能&#xff0c;它只会给所有注册的执行器分配好分片序号&#xff0c;在向执行器下发任务调度的同时携带分片总数和当前分片序号等参数 设计作业分片方案保证多个执行器之间不会查询到重复的任务,保证任务不会重复执行 任…

如何标准化地快速编辑文档

介绍个公文类的文档技巧吧&#xff0c;尤其在国企、机关、有ISO管理体系内控要求的会议记录、公文写作等&#xff0c;要求大同小异&#xff0c;一般都是中规中矩的【GB/T 9704—2012】&#xff0c;其实国标本身就是经过长期检验&#xff0c;证明是最规范合理&#xff0c;阅读效…

交友系统---让陌生人变成熟悉人的过程。APP小程序H5三端源码交付,支持二开。

随着社交网络的发展和普及&#xff0c;人们之间的社交模式正在发生着深刻的变革。传统的线下交友方式已经逐渐被线上交友取而代之。而同城交友正是这一趋势的产物&#xff0c;它利用移动互联网的便利性&#xff0c;将同城内的人们连接在一起&#xff0c;打破了时空的限制&#…

uniapp基于Android的环境保护环保商城系统生活垃圾分类 小程序_rsj68

本环境保护生活App是为了提高用户查阅信息的效率和管理人员管理信息的工作效率&#xff0c;可以快速存储大量数据&#xff0c;还有信息检索功能&#xff0c;这大大的满足了用户和管理员这两者的需求。操作简单易懂&#xff0c;合理分析各个模块的功能&#xff0c;尽可能优化界面…

SpringBoot整合Flowable最新教程(一)Flowable介绍

一、Flowable 入门介绍 代码实现文章&#xff1a;SpringBoot整合Flowable最新教程&#xff08;二&#xff09; 官网地址&#xff1a;https://www.flowable.org/   Flowable6.3中文教程&#xff1a;中文教程地址   可以在官网下载对应的jar包在本地部署运行&#xff0c;官方…

STM32L4学习

STM32L4系列是围绕Cortex-M4构建&#xff0c;具有FPU和DSP指令集&#xff0c;主频高达80MHz。 STM32CubeL4简介 STM32Cube 是 ST 提供的一套性能强大的免费开发工具和嵌入式软件模块&#xff0c;能够让开发人员在 STM32 平台上快速、轻松地开发应用。它包含两个关键部分&…

VS2017+Qt运行打开黑窗口

右键工程属性&#xff0c;找到链接器->系统->改为控制台即可

【5G SA流程】5G SA下终端完整注册流程介绍

博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 本人就职于国际知名终端厂商,负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G算力网络技术标准研究。 博客内容主要围绕: 5G/6G协议讲解 …

09.领域驱动设计:深入学习6本经典推荐书籍

目录 前言 1、《领域驱动设计&#xff1a;软件核心复杂性应对之道》 1.作者简介 2.内容简介 3.推荐理由 4.豆瓣链接 ​编辑 2、《实现领域驱动设计》 1.作者简介 2.内容简介 3.推荐理由 4.豆瓣链接 ​编辑 3、《领域驱动设计精粹》 1.作者简介 2.内容简介 3.推…

缓存的概念

文章目录 一、系统缓存buffer与cachecache 的保存位置cache 的特性 二、用户层缓存DNS缓存 三、浏览器缓存过期机制最后修改时间Etag标记过期时间 expires混合使用和缓存刷新缓存刷新 cookie和session 四、CDN缓存什么是CDN用户请求CDN流程利用 302 实现转发请求重定向至最优服…