高性能队列框架-Disruptor使用、Netty结合Disruptor大幅提高数据处理性能

news2024/11/28 9:47:33

高性能队列框架-Disruptor

首先介绍一下 Disruptor 框架,Disruptor是一个通用解决方案,用于解决并发编程中的难题(低延迟与高吞吐量),Disruptor 在高并发场景下性能表现很好,如果有这方面需要,可以深入研究其源码

其本质还是一个队列(环形),与其他队列类似,也是基于生产者消费者模式设计,只不过这个队列很特别是一个环形队列。这个队列能够在无锁的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后次序消费。

使用 Disruptor 框架的好处就是:速度快!

生产者向 RingBuffer 写入,消费者从 RingBuffer 中消费,基于 Disruptor 开发的系统每秒可以支持 600 万订单

下边介绍一下 Disruptor 框架中常见概念:

RingBuffer

基于数组实现的一个环,用于在不同线程间传递数据,RingBuffer 有一个 Sequencer 序号器,指向数组中下一个可用元素

在这里插入图片描述

Sequencer 序号器

该类是 Disruptor 核心,有两个实现类:

  • SingleProducerSequencer 单生产者
  • MultiProducerSequencer 多生产者

WaitStrategy 等待策略

消费者等待生产者将数据放入 RingBuffer,有不同的等待策略:

  • BlockingWaitStrategy:阻塞等待策略,最低效的策略,但其对 CPU 的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。
  • SleepingWaitStrategy:休眠等待策略,性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景。
  • YieldingWaitStrategy:产生等待策略,性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于 CPU 逻辑核心数的场景中,推荐使用。是无锁并行

Disruptor 的设计中是没有锁的,在 Disruptor 中出现线程竞争的地方也就是 RingBuffer 中的下标 Sequence,Disruptor 通过 CAS 操作来代替加锁,从而提升性能,CAS 的性能大约是加锁操作性能的 8 倍,

伪共享问题

Disruptor 中还会出现伪共享问题

参考:《高性能队列——Disruptor》——美团技术团队

缓存行

Cache 是由很多个 cache line 组成,每个 cache line 通常是 64B,并且可以有效地引用主内存中的一块地址。

Java 中 long 类型变量是 8B,因此一个 cache line 可以存储 8 个 long 类型变量

CPU 每次从主存中拉取数据时,会把相邻的数据也存入同一个 cache line,那么在访问一个 long 数组时,如果数组中的一个值被加入缓存中,那么也会加载另外 7 个

伪共享问题

在 ArrayBlockingQueue 中有 3 个成员变量:

  • takeIndex:需要被取走元素下标
  • putIndex:可被插入元素下标
  • count:队列元素数量

这 3 个变量如果在同一个 cache line 中的话,假如此时有两个线程对这 3 个变量进行操作,线程 A 修改了 takeIndex 变量,那么会导致线程 B 中这个变量所在的 cache line 失效,需要从内存重新读取

这种无法充分利用 cache line 特性的线程,成为 伪共享

解决方案就是,增大数组元素之间的间隔,使得不同线程存取的元素位于不同的 cache line 上,通过空间换时间

在jdk1.8中,有专门的注解 @Contended 来避免伪共享,更优雅地解决问题。

Disruptor 通过哪些设计来解决队列速度慢的问题了呢?

  • 环形数组 RingBuffer

    采用环形数组,空间重复利用,避免垃圾回收,并且数组对于缓存机制更加友好

  • 元素位置定位

    数组长度 2^n,通过位运算,加快定位速度

  • 无锁设计

    通过 CAS 代替锁来保证操作的线程安全

    在美团内部,很多高并发场景借鉴了Disruptor的设计,减少竞争的强度。其设计思想可以扩展到分布式场景,通过无锁设计,来提升服务性能

Disruptor 多个生产者、多个消费者原理

在 Disruptor 中,多个生产者生产数据时,每个线程获取不同的一段数组空间再加上 CAS 操作,可以避免多个线程重复写同一个元素

在读取时,如何避免读取到未写的元素呢?

Disruptor 中新创建了一个与 RingBuffer 大小相同的 available Buffer,当某个位置写入成功,就在 available Buffer 中标记为 true,通过该标记来读取已经写好的元素

Disruptor 单生产者单消费者实战

首先引入依赖:

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.4</version>
</dependency>

定义订单:

/**
 * 订单对象,生产者要生产订单对象,消费者消费订单对象
 */
public class OrderEvent {
    // 订单的价格
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

定义工厂类,用于创建订单对象:

/**
 * 建立一个工厂类,用于创建Event的实例(OrderEvent)
 */
public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {
        // 生产对象
        return new OrderEvent();
    }
}

定义事件处理器,用于监听消费订单:

/**
 * 消费者
 */
public class OrderEventHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) {
        System.err.println("消费者:" + orderEvent.getValue());
    }
}

定义生产者,用于生产订单:

public class OrderEventProducer {

    // ringBuffer 用于存储数据
    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    // 生产者向 ringBuffer 中生产消息
    public void sendData(ByteBuffer data) {
        // 1. 生产者先从 ringBuffer 拿到可用的序号
        long sequence = ringBuffer.next();
        try {
            // 2.根据这个序号找到具体的 OrderEvent 元素, 此时获取到的 OrderEvent 对象是一个没有被赋值的空对象
            OrderEvent event = ringBuffer.get(sequence);
            // 3. 设置订单价格
            event.setValue(data.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 4. 提交发布操作
            ringBuffer.publish(sequence);
        }
    }
}

测试类:

public class Main {
    public static void main(String[] args) {
        // 初始化一些参数
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 8;
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        /**
         * 参数说明:
         * eventFactory:消息(event)工厂对象
         * ringBufferSize: 容器的长度
         * executor:线程池,建议使用自定义的线程池,线程上限。
         * ProducerType:单生产者或多生产者
         * waitStrategy:等待策略
         */
        // 1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
                orderEventFactory,
                ringBufferSize,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());
        // 2. 向 Disruptor 中添加消费者,消费者监听到 Disruptor 的 RingBuffer 中有数据了,就会进行消费
        disruptor.handleEventsWith(new OrderEventHandler());
        // 3. 启动disruptor
        disruptor.start();
        // 4. 拿到存放数据的容器:RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        // 5. 创建生产者
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);
        // 6. 通过生产者向容器 RingBuffer 中存放数据
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 100; i++) {
            bb.putLong(0, i);
            producer.sendData(bb);
        }

        // 7.关闭
        disruptor.shutdown();
        executor.shutdown();
    }
}

Disruptor 多生产者和多消费者实战

定义消费者,用于从 ringBuffer 中消费订单:

public class ConsumerHandler implements WorkHandler<Order> {

    // 每个消费者有自己的id
    private String comsumerId;

    // 计数统计,多个消费者,所有的消费者总共消费了多个消息。
    private static AtomicInteger count = new AtomicInteger(0);

    private Random random = new Random();

    public ConsumerHandler(String comsumerId) {
        this.comsumerId = comsumerId;
    }

    // 当生产者发布一个 sequence,ringbuffer 中一个序号,里面生产者生产出来的消息,生产者最后publish发布序号
    // 消费者会监听,如果监听到,就会ringbuffer去取出这个序号,取到里面消息
    @Override
    public void onEvent(Order event) throws Exception {
        // 模拟消费者处理消息的耗时,设定1-4毫秒之间
        TimeUnit.MILLISECONDS.sleep(1 * random.nextInt(5));
        System.out.println("当前消费者:" + this.comsumerId + ", 消费信息 ID:" + event.getId());
        // count 计数器增加 +1,表示消费了一个消息
        count.incrementAndGet();
    }

    // 返回所有消费者总共消费的消息的个数。
    public int getCount() {
        return count.get();
    }
}

定义订单:

@Data
public class Order {

    private String id;

    private String name;

    private double price;

    public Order() {
    }
}

定义生产者,用于向 ringBuffer 中生产订单:

public class Producer {
    private RingBuffer<Order> ringBuffer;

    // 为生产者绑定 ringBuffer
    public Producer(RingBuffer<Order> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    // 发送数据
    public void sendData(String uuid) {
        // 1. 获取到可用sequence
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(uuid);
        } finally {
            // 2. 发布序号
            ringBuffer.publish(sequence);
        }
    }
}

测试类:

public class TestMultiDisruptor {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建 RingBuffer,Disruptor 包含 RingBuffer
        RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, // 多生产者
                new EventFactory<Order>() {
                    @Override
                    public Order newInstance() {
                        return new Order();
                    }
                }, 1024 * 1024, new YieldingWaitStrategy());
        // 2. 创建 ringBuffer 屏障
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
        // 3. 创建多个消费者数组
        ConsumerHandler[] consumers = new ConsumerHandler[10];
        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new ConsumerHandler("C" + i);
        }
        // 4. 构建多消费者工作池
        WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumers);
        // 5. 设置多个消费者的 sequence 序号,用于单独统计消费者的消费进度。消费进度让RingBuffer知道
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        // 6. 启动 workPool
        workerPool.start(Executors.newFixedThreadPool(5)); // 在实际开发,自定义线程池。
        final CountDownLatch latch = new CountDownLatch(1);
        // 100 个生产者向 ringBuffer 生产数据,每个生产者发送 100 个数据,共 10000 个数据
        for (int i = 0; i < 100; i ++) {
            final Producer producer = new Producer(ringBuffer);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 先等待创建完 100 个生产者之后,再发送数据
                        latch.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // 每个生产者发送 100 个数据
                    for (int j = 0; j < 100; j ++) {
                        producer.sendData(UUID.randomUUID().toString());
                    }
                }
            }).start();
        }
        // 把所有线程都创建完
        TimeUnit.SECONDS.sleep(2);
        // 唤醒线程让生产者开始发送数据,开始运行100个线程
        latch.countDown();
        // 等待数据发送完毕
        TimeUnit.SECONDS.sleep(10);
        System.out.println("任务总数:" + consumers[0].getCount());
    }

    static class EventExceptionHandler implements ExceptionHandler<Order> {
        //消费时出现异常
        @Override
        public void handleEventException(Throwable throwable, long l, Order order) {
        }

        //启动时出现异常
        @Override
        public void handleOnStartException(Throwable throwable) {
        }

        //停止时出现异常
        @Override
        public void handleOnShutdownException(Throwable throwable) {
        }
    }
}

Disruptor 与 Netty 结合大幅提高数据处理性能

使用 Netty 接收处理数据时,不要在工作线程上进行处理,降低 Netty 性能,可以使用异步机制,通过线程池来处理,异步处理的话,就是用 Disruptor 来作为任务队列即可

即在 Netty 收到处理数据请求时,封装成一个事件,向 Disruptor 中推送,再通过多消费者来进行处理,可以提升 Netty 处理数据时的性能,流程图如下(绿色部分为通过 Disruptor 优化部分):

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1295798.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wps word中图片 一保存失真变糊

在wps中依次点击 文件-文字偏好设置-常规与保存 勾选不压缩文件中的图像 并 将默认目标输出设置为220ppi 即可

FacetWP WordPress网站高级筛选过滤插件(含所有扩展)

点击阅读FacetWP WordPress网站高级筛选过滤插件原文 FacetWP WordPress网站高级筛选过滤插件向电子商务网站、资源库、搜索页面等添加分面搜索。FacetWP 的过滤元素&#xff08;称为 facets&#xff09;动态调整以适应用户输入。这有助于防止出现“未找到结果”&#xff0c;从…

剪切板管理 Paste中文 for Mac

Paste是一个方便的剪贴板管理工具&#xff0c;它可以帮助你更好地组织、查找和管理剪贴板中的内容。它提供了历史记录、搜索、组织、格式处理和云同步等功能&#xff0c;使你能够更高效地使用剪贴板&#xff0c;并节省时间和精力。无论是在个人使用还是团队协作中&#xff0c;P…

Gitlab+GitlabRunner搭建CICD自动化流水线将应用部署上Kubernetes

文章目录 安装Gitlab服务器准备安装版本安装依赖和暴露端口安装Gitlab修改Gitlab配置文件访问Gitlab 安装Gitlab Runner服务器准备安装版本安装依赖安装Gitlab Runner安装打包工具安装docker安装java17安装maven 注册Gitlab Runner 搭建自动化部署准备SpringBoot项目添加一个Co…

华媒舍:引擎霸屏推广,10个技巧帮助你登上霸者!

下面我们就向您介绍引擎霸屏推广&#xff0c;及其10个技巧&#xff0c;这种技巧将帮助你在市场上获得不菲的成绩。 引擎霸屏推广引擎霸屏推广是一种营销策略&#xff0c;希望通过规模性推广产品&#xff0c;帮助品牌在顾客脑中占主导地位。这是一种依靠检索引擎等途径&#xf…

总线(什么是南北桥?您都用过哪些总线?)

什么是总线&#xff1f; 计算机系统中的总线&#xff08;Bus&#xff09;是指计算机设备和设备之间传输信息的公共数据通道&#xff0c;是连接计算机硬件系统内多种设备的通信线路&#xff0c;它的一个重要特征是由总线上的所有设备共享&#xff0c;因此可以将计算机系统内的多…

嵌入版python作为便携计算器(安装及配置ipython)

今天用别的电脑调试C&#xff0c;需要计算反三角函数时发现没有趁手工具&#xff0c;忽然想用python作为便携计算器放在U盘&#xff0c;遂想到嵌入版python 懒得自己配可以直接下载&#xff0c;使用方法见第4节 1&#xff0c;下载embeddable python&#xff08;嵌入版python&…

[mysql]linux安装mysql5.7

之前安装的时候遇到了很多问题&#xff0c;浪费了一些时间。整理出这份教程&#xff0c;照着做基本一遍过。 这是安装包: 链接&#xff1a;https://pan.baidu.com/s/1gBuQBjA4R5qRYZKPKN3uXw?pwd1nuz 1.下载安装包&#xff0c;上传到linux。我这里就放到downloads目录下面…

循环单向链表与约瑟夫问题

循环链表介绍 先不急着看约瑟夫问题是什么&#xff0c;先了解循环链表的结构&#xff0c;那什么是循环链表&#xff1f; 循环&#xff0c;顾名思义&#xff0c;从链表中第一个节点出发&#xff0c;还会遇到第一个节点&#xff0c;形成循环的一环。也就是说链表中最后一个节点…

项目实战之RabbitMQ冗余双写架构

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;啥技术都喜欢捣鼓捣鼓&#xff0c;喜欢分享技术、经验、生活。 &#x1f60e;人生感悟&#xff1a;尝尽人生百味&#xff0c;方知世间冷暖。 &#x1f4d6;所属专栏&#xff1a;项…

12.6每日一题(备战蓝桥杯程序的控制结构)

12.6每日一题&#xff08;备战蓝桥杯程序的控制结构&#xff09; 题目 1638: 【入门】判断正负数或零题目描述输入输出样例输入样例输出来源/分类 题解 1638: 【入门】判断正负数或零题目 1348: 【入门】求绝对值题目描述输入输出样例输入样例输出来源/分类 题解 1348: 【入门】…

【教学类-35-05】17号的学号字帖(A4竖版1份)

作品展示&#xff1a; 背景需求&#xff1a; 大四班17号男孩目前无法自主数学数字。他表示自己能够认识数字&#xff0c;但不会写。 保育老师说&#xff1a;我曾经教过他&#xff0c;抓着手示范的。但是他记不住。家里估计也不练习的。年龄还没到&#xff0c;下学期再看看能不…

SpringBoot项目访问resources下的静态资源

1.新建一个配置文件夹&#xff0c;放配置类 2.编辑 WebMvcConfig.java package com.southwind.configuration;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import or…

期末速成数据库极简版【查询】(2)

目录 select数据查询----表 【1】筛选列 【2】where简单查询 【3】top-n/distinct/排序的查询 【4】常用内置函数 常用日期函数 常用的字符串函数 【5】模糊查询 【6】表数据操作——增/删/改 插入 更新 删除 【7】数据汇总 聚合 分类 ​ &#x1f642;&#…

2.6 A 的 LU 分解

一、A LU 线性代数很多关键的概念实际上就是矩阵的分解&#xff08;factorization&#xff09;。原始矩阵 A A A 变成两个或三个特殊矩阵的乘积。第一个分解&#xff0c;实际上也是最重要的分解&#xff0c;来自消元法。因子 L L L 和 U U U 都是三角形矩阵&#xff0c;分…

自动化测试:PO模式详解!

PO&#xff08;Page Object&#xff09;模式是一种在自动化测试中常用的设计模式&#xff0c;将页面的每个元素封装成一个对象&#xff0c;通过操作对象来进行页面的交互。 概括来说就是&#xff0c;每个页面都有对应的PO类&#xff0c;PO类中包含了页面的元素定位和操作方法。…

ArkTS语言难吗?鸿蒙指南

HarmonyOS的开发语言是ArkTS、JS(JavaScript)。 ArkTS简介 ArkTS是HarmonyOS优选的主力应用开发语言。ArkTS围绕应用开发在TypeScript&#xff08;简称TS&#xff09;生态基础上做了进一步扩展&#xff0c;继承了TS的所有特性&#xff0c;是TS的超集。因此&#xff0c;在学习…

c语言-动态内存管理

文章目录 一、为什么会有动态内存管理二、申请内存函数1、malloc2、free3、calloc4、realloc 三、常见的动态内存的错误四、练习 一、为什么会有动态内存管理 1.我们一般的开辟空间方式&#xff1a; int a 0;//申请4个字节空间 int arr[10] { 0 };//申请40个字节空间2.这样…

如何运用gpt改写出高质量的文章 (1)

大家好&#xff0c;今天来聊聊如何运用gpt改写出高质量的文章 (1)&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff1a; 如何运用GPT改写出高质量的文章 一、引言 随着人工智能技术的飞速发展&#xff0c;自然…

QT 重定向qdebug输出到自绘界面

因为在嵌入式中调试qt需要查看输出信息,特意写了一个类用户便捷查看qdebug信息 界面如下: 提供了开始,停止,保存,清空,退出功能,具体代码下文给出 文件如下 #ifndef QDEBUGREDIRECT_H #define QDEBUGREDIRECT_H /**qdebug 重定向类 定向到界面控件*李吉磊 2023.12.7* */#in…