Java8中的Stream流

news2025/1/5 0:03:58

定义

什么是Stream流,Java doc中是这样写的

A sequence of elements supporting sequential and parallel aggregate operations

翻译一下就是一个支持顺序和并行聚合操作的元素序列。
可以把它理解成一个迭代器,但是只能遍历一次,就像是流水一样,要处理的元素在流中传输,并且可以在流中设置多个处理节点,元素在经过每个节点后会被节点的逻辑所处理。比如可以进行过滤、排序、转换等操作。

Stream流的使用可以分为三个步骤:

  • 数据源,创建流
  • 中间操作,可以有多个,生成一个新的流
  • 终端操作,只能有一个,放在最后,代表流中止。

Stream流有几个特点:
1、Stream流一般不会改变数据源,只会生成一个新的数据流。
2、Stream流不会存储数据,只会根据设置的操作节点处理数据。
3、Stream流是延迟执行的,只有在调用终端操作后才会进行流转。

看一下Stream的结构
stream

使用

数据源生成流

  • 如果是集合的话,可以直接使用stream()创建流。
  • 如果是数组的话,可以使用Arrays.stream()Stream.of()来创建流。
// 集合生成流
List<String> strList = new ArrayList<>();
Stream<String> stream = strList.stream();

//数据生成流
String[] strs = new String[]{"1","2","3"};
Stream<String> stream1 = Arrays.stream(strs);
Stream<String> stream2 = Stream.of(strs);

中间操作

在上边Stream定义中,返回是Stream类型的大多数都是中间操作,入参大多数都是函数式编程,不熟悉的可以看看这篇Java函数式编程。常用的中间操作有

  • 过滤操作 filter()
Arrays.stream(strs).filter(s -> s.equals("1"));
  • 排序操作 sorted()
Arrays.stream(strs).sorted();
  • 去重操作 distinct()
Arrays.stream(strs).distinct();
  • 映射操作,将流中元素转换成新的元素
    • mapToInt()转换成Integer类型
    • mapToLong()转换成Long类型
    • mapToDouble()转换成Double类型
    • map() 自定义转换类型,这是一个使用频率非常高的方法。
//将字符串转换成Integer
Arrays.stream(strs).mapToInt(s -> Integer.valueOf(s));
//将字符串转换成Long
Arrays.stream(strs).mapToLong(s -> Long.valueOf(s));
//将字符串转换成Doublde
Arrays.stream(strs).mapToDouble(s -> Double.valueOf(s));
//自定义转换的类型
Arrays.stream(strs).map(s -> new BigDecimal(s));

中间操作是可以有多个的,我们可以根据业务功能组合多个中间操作,比如求数组中字符串包含s的字符串长度排序

Arrays.stream(strs).filter(e->e.contains("s")).map(String::length).sorted();

终端操作

终端操作,表示结束流操作,是在流的最后,常用的有

  • 统计 count()
long count = Arrays.stream(strs).count();
// count=3
  • 获取最小值 min()
// 将字符串转换成Interger类型再比较大小
 OptionalInt min = Arrays.stream(strs).mapToInt(Integer::valueOf).min();
 System.out.println(min.getAsInt());
 // 1
  • 获取最大值 max()
 OptionalInt max = Arrays.stream(strs).mapToInt(Integer::valueOf).max();
 System.out.println(max.getAsInt());
 // 3
  • 匹配
    • anyMatch(),只要有一个匹配就返回true
    • allMatch(),只有全部匹配才返回true
    • noneMatch(),只要有一个匹配就返回 false
boolean all = Arrays.stream(strs).allMatch(s -> s.equals("2"));
boolean any = Arrays.stream(strs).anyMatch(s -> s.equals("2"));
boolean none = Arrays.stream(strs).noneMatch(s -> s.equals("2"));
// all = false
// any = true
// none = false
  • 组合 reduce()将Stream 中的元素组合起来,有两种用法
    • Optional reduce(BinaryOperator accumulator) 没有起始值只有运算规则
    • T reduce(T identity, BinaryOperator accumulator),有运算起始值和运算规则、返回的是和起始值一样的类型
Integer[] integers = new Integer[]{1,2,3};
Optional<Integer> reduce1 = Arrays.stream(integers).reduce((i1, i2) -> i1 + i2);
Integer reduce2 = Arrays.stream(integers).reduce(100, (i1, i2) -> i1 + i2);
// reduce1.get() = 6
// reduce2 = 106
  • 转换 collect(),转换作用是将流再转换成集合或数组,这也是一个使用频率非常高的方法。
    collect()一般配合Collectors使用,Collectors 是一个收集器的工具类,内置了一系列收集器实现,比如toList() 转换成list集合,toMap()转换成Map,toSet()转换成Set集合,joining() 将元素收集到一个可以用分隔符指定的字符串中。
String[] strs = new String[]{"11111", "222", "3"};
//统计每个字符串的长度
List<Integer> lengths = Arrays.stream(strs).map(String::length).collect(Collectors.toList());
String s = Arrays.stream(strs).collect(Collectors.joining(","));
// lengths=[5,3,1]
// s = 11111,222,3

合理的组合Steam操作,可以很大的提升生产力

原理


Stream的实现类中,将Stream划分成了HeadStatelessOpStatefulOpHead控制数据流入,中间操作分为了StatelessOpStatefulOp

StatelessOp代表无状态操作:每个数据的处理是独立的,不会影响或依赖之前的数据。像filter()map()等。

StatefulOp代表有状态操作::处理时会记录状态,比如后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去等这样有状态的操作,像sorted()

现在已下面代码为例,分析一下Stream的原理

 list.stream()
     .filter(e -> e.length() > 1)
     .sorted()
     .filter(e -> e.equals("333"))
     .collect(Collectors.toList());

数据源生成流

首先,进入到list.stream()

//Collection#stream

 default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
 }

default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
 }

//StreamSupport#stream
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

将原数据封装成Spliterator,同时生成一个Head,将Spliterator放到Head中。

中间操作

接着分析中间操作.filter(e -> e.length() > 1)的代码

//ReferencePipeline#filter
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                  StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

返回的是一个无状态操作StatelessOp,查看StatelessOp的构造函数

// AbstractPipeline#AbstractPipeline
  AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
      if (previousStage.linkedOrConsumed)
          throw new IllegalStateException(MSG_STREAM_LINKED);
      previousStage.linkedOrConsumed = true;
      previousStage.nextStage = this;

      this.previousStage = previousStage;
      this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
      this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
      this.sourceStage = previousStage.sourceStage;
      if (opIsStateful())
          sourceStage.sourceAnyStateful = true;
      this.depth = previousStage.depth + 1;
  }

构造函数中有previousStage.nextStage = this;this.previousStage = previousStage;,相当于将当前的StatelessOp操作拼接到Head后面,构成了一条双向链表。

再看后面的.sorted().filter(e -> e.equals("333")).limit(10),也会将操作添加到了双向链表后面。.sorted()在链表后面添加的是StatefulOp有状态操作。

终端操作

最后走到终端操作.collect(Collectors.toList())。进入到collect()

//ReferencePipeline#collect
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
            && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
            ? (R) container
            : collector.finisher().apply(container);
}

并发操作先不看,直接看container = evaluate(ReduceOps.makeRef(collector));ReduceOps.makeRef()返回是TerminalOp,代表的是终端操作。

evaluate()

//AbstractPipeline#evaluate
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

先不管并行,进串行入evaluateSequential()

//ReduceOps#evaluateSequential
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                    Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

makeSink()将返回一个Sink实例,并作为参数和 spliterator 一起传入最后一个节点(terminalOp)的 wrapAndCopyInto() 方法

//AbstractPipeline#wrapAndCopyInto
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
     Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

wrapSink()将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

到现在整个流水已经拼接完成。真正的数据处理在copyInto()中。

//AbstractPipeline#copyInto
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

Sink中有三个方法:

  • begin:节点开始准备
  • accept: 节点处理数据
  • end: 节点处理结束

Sink与操作是相关的,不同的Sink有不同的职责,无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

比如filter这种无状态的操作,处理完数据会直接交给下游,而像sorted这种无有状态的操作在begin阶段会先创建一个容器,accept会将流转过来的数据保存起来,最后在执行 end方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

Steam还可以支持并行流,把list.stream()换成list.parallelStream()即可使用并行操作。

并行过程中,构建操作链的双向链表是不变的,区别实在构建完后的操作

//AbstractPipeline#evaluate
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

这次进入到 evaluateParallel()

//ReduceOps#evaluateSequential
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

ReduceTask继承自ForkJoinTaskSteam的并行底层用的是ForkJoin框架。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nodejs核心模块之Events

核心模块之Events 通过EventEmitter类实现事件统一管理 events与EventEmitter node.js是基于事件驱动的异步操作架构&#xff0c;内置events模块events模块提供了EventEmitter类node.js中很多内置核心模块集成EventEmitter EventEmitter常见Api on 添加实现被触发时调用的…

学生静态HTML个人博客主页【Web大学生网页作业成品】HTML+CSS+JavaScript

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

【没用的小知识又增加了--CCS】

1.CCS中导入工程时提示overlaps the location of another project问题 ​ ​ 工作区要选择最外面的文件夹 ​ 2. error #131: expected a "{" error: #130: expected a "{"_kuyoungest的博客-CSDN博客如果该提示定位到文件开头的语句&#xff0c;则应在…

【Spring(四)】Spring基于注解的配置方式

有关Spring的所有文章都收录于我的专栏&#xff1a;&#x1f449;Spring&#x1f448; 目录 一、前言 二、基于注解需要的依赖 三、通过注解来配置Bean 四、注解配置Bean再补充 五、基于注解的自动装配 六、泛型依赖注入 相关文章 【Spring&#xff08;一&#xff09;】如何获取…

企业知识管理难?选对系统可解决90%的问题

编者按&#xff1a;知识管理是企业加强竞争优势和核心竞争力的保证。本文分析了企业知识管理中遇到的困难&#xff0c;并进一步提出了解决方案——天翎KMS群晖云盘一体机。 关键词&#xff1a;在线预览&#xff0c;在线编辑&#xff0c;权限管理&#xff0c;水印设置&#xff…

macOS Ventura13.0.1解决office缺少“宋体”等问题。安装微软雅黑、宋体等字体。

最近在弄项目验收文档&#xff0c;文档格式要求宋体&#xff0c;用微软的Word打开文件保存时经常提示&#xff0c;系统不存在宋体字体&#xff0c;查了下是是Mac系统本身不存在该字体导致的&#xff0c;下载该字体&#xff0c;然后通过字体册安装就行。 我打包成压缩包了具体有…

【易错小点记录】坑人的for循环与逻辑或

目录 1.题目 1.1.以下for循环的执行次数是&#xff08; &#xff09; 1.1.1.题目分析 1.1.2.题目答案 1.2.下列main()函数执行后的结果为&#xff08;&#xff09; 1.2.题目分析 1.3.题目答案 2.题目 2.1.下面程序输出是什么&#xff1f;&#xff08; &#xff09; 2.…

非线性海洋捕食者算法(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

树表的查找

二叉排序树 二叉排序树&#xff08;BST&#xff09;又称二叉搜索树&#xff0c;其满足以下性质&#xff1a; &#xff08;1&#xff09;若根节点的左子树非空&#xff0c;则左子树上的所有节点关键字均小于根节点的关键字。 &#xff08;2&#xff09;若根节点的右子树非空&a…

补充(二)古典密码两张思维导图速通

目录 目录 古典密码思维导图 古典密码分析思维导图 唯密文分析古典密码 单表代替密码 棋盘密码 曾公密码 置换密码的代表&#xff1a;斯巴达人的密码棒 古典密码思维导图 古典密码分析思维导图 唯密文分析古典密码 最困难的分析条件通常需要用到英文字母的频率分析和反…

【微服务】SpringCloud中Ribbon集成Eureka实现负载均衡

&#x1f496; Spring家族及微服务系列文章 ✨【微服务】SpringCloud轮询拉取注册表及服务发现源码解析 ✨【微服务】SpringCloud微服务续约源码解析 ✨【微服务】SpringCloud微服务注册源码解析 ✨【微服务】Nacos2.x服务发现&#xff1f;RPC调用&#xff1f;重试机制&#xf…

Maven打Jar包,启动报NoClassDefFoundError错误

今天准备将游戏服务器的压测机器人打包分发给其他人来运行对服务器进行压力测试。打成的jar包发现运行报错了。找了半天才找到最终原因。下面是原因和一些分析的情况。 原因 java -jar .\robot.jar发现错误如下 看到这个错误就知道jvm找不到对应的类。但是为什么找不到对应的…

JVM的内存区域划分

文章目录 前言一、本地方法栈&#xff08;线程私有&#xff09;二、程序计数器&#xff08;线程私有&#xff09;三、Java虚拟机栈&#xff08;线程私有&#xff09;四、堆&#xff08;线程共享&#xff09;五、方法区&#xff08;元数据区&#xff09;前言 JVM 是Java 运行的基…

Android:Navigation使用safe args插件传递参数

Navigation使用safe args插件传递参数1、 使用配置2、举例说明1、MainActivity2、AvalFragment, DovomFragment2.1、AvalFragment2.2、DovomFragment参考1、 使用配置 afe args与传统传参方式相比&#xff0c;好处在于安全的参数类型&#xff0c;并且通过谷歌官方的支持&#…

GameFrameWork框架(Unity3D)使用笔记(六)游戏主流程ProcedureMain——从数据表加载出所需实体

目录 前言&#xff1a; 一、Entity配置表 1、创建数据表 2、创建数据表行类 二、Character配置表 1、创建数据表 2、写数据表行类 三、加载数据表 四、扩展一下Entity模块 五、应用Character数据表的位置信息 六、测试 总结&#xff1a; 前言&#xff1a; 上一篇中我…

第2章 Elasticsearch入门

2.1 Elasticsearch 安装 2 . 1 .1 下载软件 Elasticsearch的官方地址&#xff1a;www.elastic.co/cn/ Elasticsearch最新的版本是7.11.2&#xff08;截止2021.3.10&#xff09;&#xff0c;我们选择7.8.0版本&#xff08;最新版本半年前的版本&#xff09; 下载地址&#x…

贝叶斯网络

贝叶斯网络的独立性&#xff1a; 当一个结点G的父节点已知的时候&#xff0c;该结点G与其所有非后代结点条件独立 交叉因果推断&#xff1a;如上述图中的例子,对于P&#xff08;i | g | d&#xff09;等于说是中D到I 这条路径中&#xff0c;做半边的路径是顺着箭头走的&#x…

表白墙(前端+后端+数据库)

目录 一、创建项目 1、创建maven项目&#xff0c;引入依赖 2、创建目录结构 二、前端代码 1、页面内容和样式 2、提交按钮的点击事件 3、发送GET请求 三、数据库 四、后端代码 1、重写doPost方法 1.1 创建Message类 1.2 重写doPost方法 1.3 实现save方法 2、重写…

你需要知道的50颗卫星:地球卫星清单

开放数据卫星 1陆地卫星 地球资源卫星令人难以置信的长期遗产已经保存了地球40多年的历史。通过无数的应用程序&#xff0c;它甚至发现 island Landsat in Canada。 图片来源&#xff1a;NASA 2哨兵 作为 Copernicus Programme 哨兵的6个任务的舰队是一个游戏改变者。明确地…

2022年经典散文:滚烫的石板

滚烫的石板 ——灵遁者 此刻&#xff0c;我想表达的情愫大概有千万种&#xff0c;如何表达并不容易&#xff0c;就好像一个人的时候&#xff0c;也在面对某个我认识或者不认识的人&#xff0c;话总是说不清&#xff0c;也说不出来。 小孩总是敢于表达的&#xff0c;就像一条没…