Java多线程篇(13)——FutureTask、Disruptor的使用

news2025/2/27 20:48:14

文章目录

  • FutureTask
    • CompletionService
    • CompletableFuture
  • Disruptor
    • Disruptor 核心概念
    • 运行流程
    • 不同生产者模式的区别
    • Disruptor设计精髓

FutureTask

现有一个场景,10个线程执行10个任务,然后主线程获取任务结果。

比较广泛的一个说法就是,runnable是没有返回值的线程,callable是有返回值的线程。所以最先想到的是用callable接口去获取线程返回值。

实际上,"runnable是没有返回值的线程,callable是有返回值的线程"这种说法。
我个人认为是错误的,我认为运行线程的方式只有一种,就是实现runnable接口!不管你是new Thread,还是使用线程池最终都要实现runnable接口。就算是用callable接口的方式也不例外。

callable的返回值是如何实现的?
提交的callable作为成员变量封装到RunnableFuture(最常用的实现类就是FutureTask),而RunnableFuture又继承自Runnable,所以其实线程池真正提交的还是一个Runnable(RunnableFuture)。
RunnableFuture.run方法调用了callable.call方法,并将call方法结果存起来,唤醒等待结果的线程。
RunnableFuture.get方法如果有结果了就直接返回,如果没有就自旋/阻塞等待唤醒。

在这里插入图片描述

CompletionService

单纯使用FutureTask有一个最大的问题就是,在获取任务结果的时候,如果前一个任务还没有结果,即使后面的任务有结果了也无法打印出来。所以有没有那么一种办法,可以让10个任务,谁先完成了就谁先打印。因此,CompletionService 来了。
在这里插入图片描述
这个实现原理不用看源码也基本可以猜到就是在原来的基础上多加一个阻塞队列,将任务结果统一存入阻塞队列,先进先出。

CompletableFuture

FutureTask还有一个问题就是会阻塞主线程。所以有没有那么一种办法,可以不阻塞主线程(异步回调)。主线程只管提交任务,提交完后就不管了,无需等待任务结果,任务完成后自己回调后续操作。因此,CompletableFuture 来了。
在这里插入图片描述
另外,CompletableFuture还支持串行执行
在这里插入图片描述

通过打印的信息得知,CompletableFuture使用的线程池是ForkJoinPool.commonPool

除此之外,CompletableFuture还支持并行执行
在这里插入图片描述


Disruptor

本篇只是简单的记录一下Disruptor的基本设计,不涉及太深的源码分析

附上一篇美团的技术文章:https://tech.meituan.com/2016/11/18/disruptor.html

Disruptor 核心概念

  • Disruptor(总体执行入口):执行引用。
  • RingBuffer(环形缓冲区):基于数组的内存级别环形数组缓存。
  • Sequence(序号分配器):通过顺序递增的方式,一个Sequence对应一个事件,同时还能消除伪共享。
  • Sequencer(数据传输器):有两个实现类,SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现)。主要作用是实现生产者和消费者之间的并发算法。
  • SequenceBarrier(消费者屏障):用于控制生产者和消费者之间的平衡。
  • WaitStrategy(消费者等待策略):当无可消费事件时的等待策略。(目前数组满需要等待时调用LockSupport.parkNanos(1),不过看注释后续可能会与等待策略挂钩)
  • Event:使用者自定义的事件数据结构。
  • EventHandler:消费者逻辑。
  • EventProcessor:实现了Runnable,并封装了EventHandler,意味着可以线程方式执行消费逻辑。
    在这里插入图片描述

使用案例

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.4</version>
        </dependency>

在这里插入图片描述

运行流程

1、构造函数

public Disruptor(
            final EventFactory<T> eventFactory, //事件工厂
            final int ringBufferSize, //ringBuffer环形数组大小
            final ThreadFactory threadFactory, //线程工厂
            final ProducerType producerType, //生产者类型,SINGLE,MULTI两种类型,不同类型有不同的sequencer实现
            final WaitStrategy waitStrategy) //等待策略
    {
        this(RingBuffer.create(
                               producerType, eventFactory, ringBufferSize, waitStrategy),
                new BasicExecutor(threadFactory));
    }

其中等待策略有如下几种
在这里插入图片描述

2、disruptor.handleEventsWith()
在这里插入图片描述
这一步就是将消费者逻辑(EventHandler)封装到消费者线程处理器(EventProcessor),并将所有消费线程处理器加入consumerRepository列表。

3、disruptor.start()
在这里插入图片描述
这一步就是启动consumerRepository中的所有消费者线程。

4、消费者线程
以BatchEventProcessor为例
在这里插入图片描述
在这里插入图片描述
消费者线程的逻辑就是不断的循环,从环形数组中获取事件消费,如果没有事件可以获取了就根据不同的等待策略进行等待。

5、disruptor.publishEvent()
在这里插入图片描述
发布一个事件逻辑挺简单的,就是获取一个序号(槽位),然后填充槽位上的事件数据,最后就是发布唤醒等待消费者。

不同生产者模式的区别

一个生产者
一个生产者的情况比较简单

写数据
1、申请写入m个元素
2、判断是否覆盖未消费数据,若无则写入数据

读数据
1、申请读取到序号n
2、从reader cursor开始消费数据到n

多生产者
多个生产者的情况下,为防止多个线程重复写同一个元素。Disruptor的做法是:每个线程获取不同的一段数组空间进行操作。对应的实现方式是,在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去,如果分配了就取下一段。
但是这会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor的做法是:引入了一个与Ring Buffer大小相同的buffer——available Buffer。当某个位置成功写入时,就把相应位置标记为写入成功。读取的时候,通过遍历available Buffer来获取一段最长的连续已写槽位。

写数据:
1、申请写入m个元素
2、若有m个元素可以写,则返回最大的序号,每个生产者会通过CAS被分配一段独享的空间,各自写入自己的空间
3、标记available Buffer对应位置为成功写入

读数据:
1、申请读取到序号n
2、若此时 write cursor > n,说明这时无法确定连续可读的最大下标。就从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置
3、消费者读取元素

Disruptor设计精髓

1、环形数组的数据结构与初始化时提前分配事件内存,可以实现槽位和事件对象的复用,减少垃圾回收次数
2、递增序号配合长度2次幂的数组长度可通过位运算替换求余
3、缓存行填充解决伪共享问题
4、无锁设计,每个生产者或者消费者会申请一个空间,不同线程在不同空间操作
5、实现了基于事件驱动的生产者消费者模型(观察者模式)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1158323.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录算法训练营第三十九天丨 动态规划part02

62.不同路径 思路 动态规划 机器人从(0 , 0) 位置出发&#xff0c;到(m - 1, n - 1)终点。 按照动规五部曲来分析&#xff1a; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义 dp[i][j] &#xff1a;表示从&#xff08;0 &#xff0c;0&#xff09;出发&#…

【每日一题】参加会议的最多员工数

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;内向基环树拓扑排序分类讨论内向基环树分类讨论基环长度大于 2基环长度等于 2 功能实现 写在最后 Tag 【内向基环树拓扑排序分类讨论】【图】【2023-11-01】 题目来源 2127. 参加会议的最多员工数 题目解读 员工只有…

阿昌教你如何优雅的数据脱敏

阿昌教你如何优雅的数据脱敏 Hi&#xff0c;我是阿昌&#xff0c;最近有一个数据脱敏的需求&#xff0c;要求用户可自定义配置数据权限&#xff0c;并对某种类型数据进行脱敏返回给前端 一、涉及知识点 SpringMVCJava反射Java自定义注解Java枚举 二、方案选择 1、需求要求…

呼叫中心的重要考核指标

呼叫中心在运营过程中越来越精细化&#xff0c;在信息化管理的时代&#xff0c;呼叫中心系统是必不可少的&#xff0c;而呼叫中心的管理人员为了提升运营效率&#xff0c;通常会根据业务目标设置各种业务的考核指标&#xff0c;而我也根据OKCC在呼叫中心项目运营过程中的经验&a…

【双十一预售】玩得越来越大了...

双十一又又又到了 剁手带来的快乐终究是短暂的 让自己变得更优秀才是长远的快乐 当今大环境 工作难找&#xff0c;钱难赚 只有不断学习与成长 方能应对未来的各种不确定性 知了堂双十一预售 0.11元畅享三大权益 助你快速实现自我提升 突破成长瓶颈 https://appyqk1x…

1. 网络之网络通信基础

网络通信基础 文章目录 网络通信基础1. IP地址2. 端口号3. 协议3.1 三要素3.2 作用 4. 五元组5. 协议分层5.1 OSI七层模型5.2 TCP/IP 五层模型5.2.1 应用层5.2.2 传输层5.2.3 网络层5.2.3 数据链路层5.2.5 物理层 6. 封装和分用6.1 发送方 - 封装6.2 中间转发6.3 接收方 - 分用…

codeMirror代码编辑器,如何定位并在编辑区域输入内容

背景 最近在写UI自动化&#xff0c;发现普通的方法不能在CodeMirror编辑器里面输入内容&#xff0c;只能通过JS的方式输入内容。 于是琢磨了一下selenium和playwright这2种自动化工具&#xff0c;在CodeMirror编辑器里面输入内容的差别。 注意&#xff1a;这里在定位CodeMirr…

轧钢厂安全生产方案:AI视频识别安全风险智能监管平台的设计

一、背景与需求 轧钢厂一般都使用打包机对线材进行打包作业&#xff0c;由于生产需要&#xff0c;人员需频繁进入打包机内作业&#xff0c;如&#xff1a;加护垫、整包、打包机检修、调试等作业。在轧钢厂生产过程中&#xff0c;每个班次生产线材超过300件&#xff0c;人员在一…

【OpenCV实现图像找到轮廓的不同特征,就像面积,周长,质心,边界框等等。】

文章目录 概要图像矩凸包边界矩形 概要 OpenCV是一个流行的计算机视觉库&#xff0c;它提供了许多图像处理和分析功能&#xff0c;其中包括查找图像中物体的轮廓。通过查找轮廓&#xff0c;可以提取许多有用的特征&#xff0c;如面积、周长、质心、边界框等。 以下是几种使用…

双目视觉检测 KX02-SY1000型测宽仪 有效修正和消除距离变化对测量的影响

双目视觉检测的基本原理 利用相机测量宽度时&#xff0c;由于单个相机在成像时存在“近大远小”的现象&#xff0c;并且单靠摄入的图像无法知道被测物的距离&#xff0c;所以由被测物的跳动导致的被测物到工业相机之间距离变化&#xff0c;使测量精度难以提高。 因此测宽仪需…

Vue项目创建与启动(2023超详细的图文教程)

目录 一、下载node.js 二、下载vue-cli与webpack插件 三、项目初始化(项目配置详细信息) 四、项目启动 五、Vue项目工程结构&#xff08;扩展知识&#xff09; 一、下载node.js 1.检测是否已经安装过node.js 打开控制台,输入 npm -v如果有会显示对应版本 如果没有会显示…

RocketMQ消费者和队列对应关系

参考 RocketMQ 5.0 POP 消费模式探秘 https://www.cnblogs.com/alisystemsoftware/p/15535925.html 旧版本MQ结论 消费者应用和topic队列一对多的关系。 &#xff08;一个消费组consumer group里&#xff0c;一个消费者应用可以消费多个队列的消息。一个队列的消息只能被一个…

矩阵分块例子

有如下矩阵A和B 对A列分块, B行分块后结果如下 对A行分块, B列分块后结果如下

企业网络带宽使用情况检查技巧

想要提高网络性能的企业通常会考虑限制对占用带宽的应用程序&#xff08;如社交媒体和视频流应用程序&#xff09;的访问&#xff0c;但对于那些真正需要获得高效网络的人来说&#xff0c;这还不够&#xff0c;您需要定期跟踪带宽使用情况。 虽然有许多工具可以帮助您检查网络…

Webpack的代码分割(code splitting)

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

干洗店服务预约小程序有什么作用

要说干洗店&#xff0c;近些年的需求度非常高&#xff0c;一方面是人们生活品质提升&#xff0c;另一方面则是各种服饰对洗涤要求提升等&#xff0c;很多人的衣服很多也会通过干洗店进行清洁。 而对从业商家来说&#xff0c;市场庞大一方面需要不断进行市场教育、品牌提升&…

Python自动化测试实战篇:unittest框架详解

为什么要学习unittest 按照测试阶段来划分&#xff0c;可以将测试分为单元测试、集成测试、系统测试和验收测试。单元测试是指对软件中的最小可测试单元在与程序其他部分相隔离的情况下进行检查和验证的工作&#xff0c;通常指函数或者类&#xff0c;一般是开发完成的。 单元…

CMake:构建时为特定目标运行自定义命令

CMake&#xff1a;构建时为特定目标运行自定义命令 导言项目结构相关源码结果 导言 add_custom_command 是 CMake 中用于添加自定义构建规则的命令&#xff0c;通常用于在编译项目时执行一些自定义操作&#xff0c;例如生成文件、运行脚本等。 项目结构 . ├── CMakeLists…

《web前端开发技术》初识Vue + 第一个 Vue程序:hello world

目录 2.1 Vue 简述 2.1.1 什么是 Vue 2.1.2 为什么选择 Vue 2.2 Vue 的三种安装方式 2.1 Vue 简述 Vue 在 JavaScript 前端开发库领域属于后来者&#xff0c;其他前端开发库有 jQuery、ExtJS、 Anguals、React 等。 2.1.1 什么是 Vue &#x1f636;‍&#x1f32b;️Vue (…

树结构及其算法-二叉树遍历

目录 树结构及其算法-二叉树遍历 一、中序遍历 二、后序遍历 三、前序遍历 C代码 树结构及其算法-二叉树遍历 我们知道线性数组或链表都只能单向从头至尾遍历或反向遍历。所谓二叉树的遍历&#xff08;Binary Tree Traversal&#xff09;&#xff0c;简单的说法就是访问树…