《Java8实战》第7章 并行数据处理与性能

news2025/1/13 13:58:44

7.1 并行流

Stream 接口能非常方便地并行处理其元素:对收集源调用 parallelStream 方法就能将集合转换为并行流。并行流就是一个把内容拆分成多个数据块,用不同线程分别处理每个数据块的流。

public long sequentialSum(long n) { 
 return Stream.iterate(1L, i -> i + 1) // 生成自然数无限流
   .limit(n) // 限制到前n 个数
   .reduce(0L, Long::sum); // 对所有数字求和来归约流
} 

传统的写法
public long iterativeSum(long n) { 
 long result = 0; 
 for (long i = 1L; i <= n; i++) { 
   result += i; 
 } 
 return result; 
} 

7.1.1 将顺序流转换为并行流

调用 parallel 方法,就成了并行流,

public long parallelSum(long n) { 
 return Stream.iterate(1L, i -> i + 1) 
 .limit(n) 
 .parallel() // 将流转换为并行流
 .reduce(0L, Long::sum); 
} 

现在 Stream 由内部被分成了几块。因此能对不同的块执行独立并行的归约操作.
image.png

配置并行流使用的线程池
看看流的 parallel 方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的 ForkJoinPool(7.2节会进一步讲到分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由 Runtime.getRuntime().availableProcessors()得到的。
但是这并非一成不变,你可以通过系统属性 java.util.concurrent.ForkJoinPool. common.parallelism 来修改线程池大小,如下所示:
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,“12”);
这是一个全局设置,因此它会对代码中所有的并行流产生影响。反过来说,目前我们还无法专为某个并行流指定这个值。一般而言,让 ForkJoinPool 的大小等于处理器数量是个不错的默认值,除非你有很充足的理由,否则强烈建议你不要修改它。

7.1.2 测量流性能

并行求和法和顺序、迭代法比较性能。
需要使用名为 Java 微基准套件(Java microbenchmark harness,JMH)的库实现了一个微基准测试。
加入maven依赖

<dependency> 
 <groupId>org.openjdk.jmh</groupId> 
 <artifactId>jmh-core</artifactId> 
 <version>1.17.4</version> 
</dependency> 
<dependency> 
 <groupId>org.openjdk.jmh</groupId> 
 <artifactId>jmh-generator-annprocess</artifactId> 
 <version>1.17.4</version> 
</dependency>
<build> 
 <plugin> 
 <groupId>org.apache.maven.plugins</groupId> 
 <artifactId>maven-shade-plugin</artifactId> 
 <executions> 
 <execution> 
 <phase>package</phase> 
 <goals><goal>shade</goal></goals> 
 <configuration> 
 <finalName>benchmarks</finalName> 
 <transformers> 
 <transformer implementation="org.apache.maven.plugins.shade. 
 resource.ManifestResourceTransformer"> 
 <mainClass>org.openjdk.jmh.Main</mainClass> 
 </transformer> 
 </transformers> 
 </configuration> 
 </execution> 
 </executions> 
 </plugin> 
 </plugins> 
</build>

测量对前 n 个自然数求和的函数的性能

// 测量用于执行基准测试目标方法所花费的平均时间
@BenchmarkMode(Mode.AverageTime)
// 以毫秒为单位,打印输出基准测试的结果
@OutputTimeUnit(TimeUnit.MILLISECONDS)
// 采用 4Gb 的堆,执行基准测试两次以获得更可靠的结果
@Fork(2, jvmArgs={"-Xms4G", "-Xmx4G"})
public class ParallelStreamBenchmark { 
 private static final long N= 10_000_000L; 
// 基准测试的目标方法
 @Benchmark 
 public long sequentialSum() { 
 return Stream.iterate(1L, i -> i + 1).limit(N) 
 .reduce( 0L, Long::sum); 
 } 
// 尽量在每次基准测试迭代结束后都进行一次垃圾回收
 @TearDown(Level.Invocation)
 public void tearDown() { 
   System.gc(); 
 } 
} 

编译这个类时,你之前配置的 Maven 插件会生成一个名为 benchmarks.jar 的 JAR 文件,你可以像下面这样执行它:
java -jar ./target/benchmarks.jar ParallelStreamBenchmark
但是因为出现自动装箱拆箱,所以性能还没有普通的快,而且因为是一个无线流的原因,修改:

@Benchmark 
public long parallelRangedSum() { 
 return LongStream.rangeClosed(1, N) 
 .parallel() 
 .reduce(0L, Long::sum); 
} 

7.1.3 正确使用并行流

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。

public long sideEffectSum(long n) { 
 Accumulator accumulator = new Accumulator(); 
 LongStream.rangeClosed(1, n).forEach(accumulator::add); 
 return accumulator.total; 
} 
public class Accumulator { 
 public long total = 0; 
 public void add(long value) { total += value; } 
} 

上面的代码 每次访问 total 都会出现数据竞争。

7.1.4 高效使用并行流

  • 如果有疑问,测量。把顺序流转成并行流轻而易举,却不一定是好事。
  • 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8 中有原始类型流(IntStream、LongStream 和 DoubleStream)来避免这种操作,但凡有可能都应该用这些流。
  • 有些操作本身在并行流上的性能就比顺序流差。特别是 limit 和 findFirst 等依赖于元素顺序的操作
  • 还要考虑流的操作流水线的总计算成本。设 N 是要处理的元素的总数, Q 是一个元素通过流水线的大致处理成本,则 N*Q 就是这个对成本的一个粗略的定性估计。
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。
  • 要考虑流背后的数据结构是否易于分解。例如,ArrayList 的拆分效率比 LinkedList高得多
  • 流自身的特点以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。
  • 还要考虑终端操作中合并步骤的代价是大是小(例如 Collector 中的 combiner 方法)。

image.png

7.2 分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

7.2.1 使用 RecursiveTask

要把任务提交到这个池,必须创建 RecursiveTask的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction类型(当然它可能会更新其他非局部机构)
要定义 RecursiveTask,只需实现它唯一的抽象方法 compute:
protected abstract R compute();

if (任务足够小或不可分) { 
 顺序计算该任务 
} else { 
 将任务分成两个子任务
 递归调用本方法,拆分每个子任务,等待所有子任务完成
 合并每个子任务的结果
}

image.png

现在编写一个方法来并行对前 n 个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:

public static long forkJoinSum(long n) { 
 long[] numbers = LongStream.rangeClosed(1, n).toArray(); 
 ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); 
 return new ForkJoinPool().invoke(task); 
} 

这里用了一个 LongStream 来生成包含前 n 个自然数的数组,然后创建一个 ForkJoinTask(RecursiveTask 的父类)
使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了
运行 ForkJoinSumCalculator
image.png

7.2.2 使用分支/合并框架的最佳做法

对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。
不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。
对子任务调用 fork 方法可以把它排进 ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用 compute 低。
和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。

7.2.3 工作窃取

image.png

7.3 Spliterator

Spliterator 是 Java 8中加入的另一个新接口,这个名字代表“可分迭代器”(splitable iterator)。和 Iterator 一样,Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。

public interface Spliterator<T> { 
 boolean tryAdvance(Consumer<? super T> action); 
 Spliterator<T> trySplit(); 
 long estimateSize(); 
 int characteristics(); 
} 

7.3.1 拆分过程

将 Stream 拆分成多个部分的算法是一个递归过程,如图 7-6 所示。第一步是对第一个Spliterator 调用 trySplit,生成第二个 Spliterator。第二步是对这两个 Spliterator 调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回 null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的 Spliterator 在调用 trySplit 时都返回了 null。
image.png

7.3.2 实现你自己的 Spliterator

一个迭代式词数统计方法

public int countWordsIteratively(String s) { 
 int counter = 0; 
 boolean lastSpace = true; 
// 逐个遍历 String中的所有字符
 for (char c : s.toCharArray()) { 
   if (Character.isWhitespace(c)) { 
     lastSpace = true; 
   } else { 
// 上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一
     if (lastSpace) counter++; 
       lastSpace = false; 
     } 
   } 
 return counter; 
} 
  1. 以函数式风格重写单词计数器

Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);
image.png

private int countWords(Stream<Character> stream) { 
 WordCounter wordCounter = stream.reduce(new WordCounter(0, true), 
                       WordCounter::accumulate, 
                       WordCounter::combine); 
 return wordCounter.getCounter(); 
} 
  1. 让 WordCounter 并行工作

image.png

7.4 小结

  • 内部迭代让你可以并行处理一个流,而无须在代码中显式使用和协调不同的线程。
  • 虽然并行处理一个流很容易,但是不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
  • 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。
  • 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎总是比尝试并行化某些操作更为重要。
  • 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。
  • Spliterator 定义了并行流如何拆分它要遍历的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/435943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

跌倒检测和识别1:跌倒检测数据集(含下载链接)

跌倒检测和识别1&#xff1a;跌倒检测数据集(含下载链接) 目录 跌倒检测和识别1&#xff1a;跌倒检测数据集(含下载链接) 1. 前言 2. 跌倒姿态&#xff1a;站立-弯腰(蹲下)-躺下 3. 跌倒检测数据集&#xff1a; &#xff08;1&#xff09;Fall-Down-Det-v1 &#xff08;2…

k8s client-go 程序实现kubernetes Controller Operator 使用CRD 学习总结

k8s client-go 程序实现kubernetes Controller & Operator 使用CRD 学习总结 大纲 1 定义CRD2 client-go自动代码生成3 client-go操作CR4 创建镜像5 配置权限6 部署到k8s 基础流程 这里使用client-go实现编写&#xff0c;相对于kubebuiler这些工具生成脚手架工程要麻烦…

学习零碎-txt转json

import re import jsondef txtToJson():# 文件路径path "./prot.txt"# 读取文件with open(path, r, encoding"utf-8") as file:# 定义一个用于切割字符串的正则# seq re.compile(":")result []# 逐行读取for line in file:lst line.split(#)…

面试官:“你会组件化开发操作吗?它的优势在哪?”

随着 Android 版本的不断更新升级和用户对 APP 产品需求技术越来越高&#xff0c;相对的各大公司对 Android 开发者们设置的招聘门槛也越来越高。 至于如何去看一个开发者水平的高低&#xff0c;一般看面试官会怎么问&#xff0c;会问哪些部分的技术内容&#xff1f; 一般公司…

【AI前沿】chatgpt还有哪些不足?

博客昵称&#xff1a;吴NDIR 个人座右铭&#xff1a;得之淡然&#xff0c;失之坦然 作者简介&#xff1a;喜欢轻音乐、象棋&#xff0c;爱好算法、刷题 其他推荐内容&#xff1a;计算机导论速记思维导图 其他内容推荐&#xff1a;五种排序算法 在这个愉快的周末让我们聊一下Cha…

【C语言】文件的输入与输出

在此之前&#xff0c;我极少使用C语言处理文件。因为我认为使用Python、matlab处理文件是及其方便的。 事实果真如此吗&#xff1f; 文章目录 一、与文件进行通信1.1 文件的定义1.2 文本文件和二进制文件1.3 底层 I/O 和 标准I/O1.4 标准文件1.5 标准 I/O 二、文件的打开和关闭…

flume 的Channel的种类

目录 1、MemoryChannel 2、FileChannel 3、KafkaChannel Flume拦截器 消息队列传输消息 1、MemoryChannel 数据放在内存中,会在Flume宕机的时候丢失数据,可以⽤在对数据安全性要求没有那么⾼的场景中⽐如⽇志数据。 2、FileChannel 不会丢失数据,因为数据是放在磁盘上边的…

前端使用国密SM4进行加密

目录 需求【方法1】 - 使用 sm4util 依赖【方法2】sm4.js引入1. /public/sm4.js2. body 标签上引入该文件3. 使用 - ECB 模式加密 【方法3】1. 本地写 js 文件2. 使用 - ECB 模式加解密 需求 前端/后端使用 国密SM4 进行加密/解密&#xff0c; 【注意】前后端配合加解密时&…

【数据结构】线性表之——“顺序表”

文章目录 前言顺序表主体结构顺序表操作函数介绍顺序表操作函数实现实现顺序&#xff1a;顺序表的初始化&#xff1a;顺序表插入函数&#xff1a;头插尾插指定位置插入 顺序表打印函数查找顺序表数据顺序表删除函数头删尾删指定位置删除 修改顺序表销毁顺序表 文件分类test.cSe…

webpack5搭建react框架-生产环境配置

webpack5配置react基础生产环境 一、前言 在项目构建时不同的环境下会有不同配置&#xff0c;在前面文章中已经使用webpack5配置好了基础环境和开发环境&#xff0c;但是在生产环境时有些配置和开发环境是不需要的&#xff0c;有些是可以在优化的&#xff0c;所以下面继续生产…

分支和循环语句——1

老铁们&#xff0c;这是博主初识C之后的第一篇C语言学习博客&#xff0c;希望可以给你们带来帮助。 文章目录 一、什么是语句? 二、分支语句 1、if语句 2、switch语句 三、while循环 一、什么是语句? C语句可分为以下五类&#xff1a; 1. 表达式语句 2. 函数调用语句…

模拟不同MIMO-OFDM方案的MATLAB代码(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果​ &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 MIMO技术指在发射端和接收端分别使用多个发射天线和接收天线&#xff0c;使信号通过发射端与接收端的多个天线传送和接收&…

杭州旭航集团,申请纳斯达克IPO上市,募资9800万美元

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;为中国企业提供数字内容营销服务的杭州旭航网络科技有限公司的控股公司Xuhang Holdings Ltd(以下简称&#xff1a;旭航集团)&#xff0c;近期已向美国证券交易委员会&#xff08;SEC&#xff09;提…

随想录Day53--动态规划: 1143.最长公共子序列 ,1035.不相交的线 , 53. 最大子序和

1143.最长公共子序列,这题要画一个二维数组&#xff0c;用两层for循环来遍历每个字符&#xff0c;从而比较是否相等。用dp[i][j]来表示当遍历到text2的第i个字符和text1的第j个字符时&#xff0c;最长的公共子序列为多少。比如说两个字符串&#xff08;“abcde”和“ace”&…

怎么样成为一名Python工程师?到底要会哪些东西?你会了多少?

目录 重点&#xff1a;爬虫部分项目、源码展示python数据分析可视化大屏看板python爬虫爬取淘宝卤鸭货商品数据python游戏开发python自动化办公 重点&#xff1a; 1、做一名程序员&#xff0c;绝对要耐得住寂寞&#xff0c;并且要一直有点兴趣促进你学习。如果你完全没兴趣&am…

electron+vue3全家桶+vite项目搭建【十】vite路径取别名、多环境相关配置

文章目录 引入1.路径取别名配置2.测试别名配置3.环境变量配置4.验证环境变量配置 引入 我们之前写代码的时候用相对路径不是很方便&#xff0c;并且所有环境共用同一套配置也不太好&#xff0c;接下来我们通过vite配置一下路径别名和环境变量 视频讲解 vite官网 demo项目地址…

DIN论文翻译

摘要 在电子商务行业&#xff0c;利用丰富的历史行为数据更好地提取用户兴趣对于构建在线广告系统的点击率(CTR)预测模型至关重要。关于用户行为数据有两个关键观察结果&#xff1a;i) 多样性(diversity)。用户在访问电子商务网站时对不同种类的商品感兴趣。ii) 局部激活(local…

Linux驱动之GPIO函数、IO内存映射、混杂设备驱动

之前学习完了字符设备驱动的大体框架&#xff0c;现在我们就使用这个基本的框架来对硬件进行操作&#xff0c;例如通过指令控制led的状态&#xff0c;编写LED驱动。LED驱动有多种实现方式。 目录 GPIO函数 IO内存映射 混杂设备驱动 GPIO函数 首先加入需要的头文件。 #incl…

欧盟立法者签署公开信,近万人联名“暂停高级AI研发”

来源丨CoinTelegraph 编辑丨liuruiWeb3CN.Pro ChatGPT 曾经的势头有多猛烈如今就被行业大佬抵制的就有多严重。 近日&#xff0c;十几位欧盟 (EU) 政客签署了“暂停高级AI研发”的公开信&#xff0c;呼吁 AI &#xff08;人工智能&#xff09;的“安全”发展&#xff0c;特斯拉…

【Android -- 软技能】聊聊高效开发的一些套路与实践

前言 在开发中&#xff0c;编码我们有分层架构、设计模式做为套路来高效开发&#xff0c;但你也知道编码不是开发的全部&#xff0c;一个完全的开发流程用面向对象思想来概括&#xff0c;它分为OOA&#xff08;面向对象分析&#xff09;、OOD&#xff08;面向对象设计&#xf…