Stream
- Stream 的核心概念
- 核心特点
- Stream 的操作分类
- 中间操作(Intermediate Operations)
- 终止操作(Terminal Operations)
- Stream 的流分类
- 顺序流(Sequential Stream)
- 并行流(Parallel Stream)
- 并行流的注意事项
- 并行流的底层机制
- 顺序流 vs 并行流的对比
- 顺序流和并行流的示例代码
- 顺序流并行流总结
Stream 的核心概念
Java 8 引入的 Stream API 是一种基于函数式编程的数据处理抽象,允许以声明式方式操作集合(如过滤、映射、排序等)。Stream 不是数据结构,而是对数据源(集合、数组、I/O 等)的高效计算工具
核心特点
- 链式调用:通过多个操作(中间操作 + 终止操作)串联处理数据
- 惰性求值:中间操作(如 filter, map)不会立即执行,直到遇到终止操作(如 collect, forEach)
- 不可复用:一个 Stream 只能被消费一次,再次使用会抛出 IllegalStateException
- 并行处理:可通过简单方法(parallel())实现多线程并行计算
Stream 的操作分类
中间操作(Intermediate Operations)
返回新的 Stream,支持链式调用,常见方法示例:filter(), map(), sorted(), distinct()。
List<Integer> list = Arrays.asList(1, 2, 2,3);
list.stream().distinct().forEach(System.out::println);//用于去除流中的重复元素。
list.stream().map(x -> x * x).forEach(System.out::println);//用于对流中的每个元素应用一个函数,并返回一个新的流
list.stream().filter(v->v==3).forEach(System.out::println);//用于对流中的元素进行筛选,只保留满足条件的元素
list.stream().flatMap(s -> s.toString().chars().boxed()).forEach(System.out::println);//用于将流中的每个元素转换为另一个流,然后将这些流连接成一个流
list.stream().limit(2).forEach(System.out::println);//用于限制流中元素的数量
list.stream().skip(2).forEach(System.out::println);//用于跳过流中前n个元素
list.stream().sorted().forEach(System.out::println);//用于对流中的元素进行排序
支持链式调用
list.stream().distinct()
.filter(v->v==3)
.sorted()
.map(x -> x * x).forEach(System.out::println);
但是注意,流中间操作的方法返回的结果依然是流,但是上面为了显示这些方法使用的样例里面
forEach(System.out::println)这个是终止操作方法,也就是在forEach方法之前的返回的是流,在使用forEach方法之后返回的又将流转换非 Stream 结果
终止操作(Terminal Operations)
触发实际计算,返回非 Stream 结果(如集合、数值、void),常见方法示示例:collect(), forEach(), reduce(), count()
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
names.stream().forEach(System.out::println);//对流中的每个元素执行指定的操作 这里是打印
names.stream() .map(String::toUpperCase).collect(Collectors.toList()).forEach(System.out::println);//将流中的元素收集到一个容器中
System.out.println(names.stream().allMatch(s -> s.contains("s")));//allMatch 检查是否匹配所有元素
System.out.println(names.stream().anyMatch(s -> s.contains("a")));//anyMatch 检查是否至少匹配一个元素
System.out.println(names.stream().noneMatch(s -> s.contains("x")));//noneMatch:检查是否没有匹配所有元素
System.out.println(names.stream().findFirst());//findFirst:返回当前流中的第一个元素
System.out.println(names.stream().findAny());//findAny:返回当前流中的任意元素
System.out.println(names.stream().count());//count:返回流中元素总数
System.out.println(names.stream().max(Comparator.comparingInt(s->s.length())));//max:返回流中最大值
System.out.println(names.stream().min(Comparator.comparingInt(s->s.length())));//min:最小值
Stream 的流分类
顺序流(Sequential Stream)
默认模式:所有操作在单线程中按顺序执行
List<Integer> list = Arrays.asList(1, 2, 3);
Stream<Integer> stream = list.stream(); // 顺序流
适用场景:
数据量较小,或操作本身简单。
需要保证操作顺序(如 sorted() 依赖前序操作结果)。
并行流(Parallel Stream)
Stream<Integer> parallelStream = list.parallelStream(); // 并行流
或
Stream<Integer> parallelStream = list.stream().parallel(); // 转换为并行流
适用场景:
数据量较大,且任务可独立拆分(无共享状态或顺序依赖)。
操作耗时(如复杂计算、I/O 等待)。
并行流的注意事项
1.线程安全问题
确保操作中使用的 Lambda 表达式或函数是线程安全的(避免共享可变状态)。
示例错误:
List<Integer> result = new ArrayList<>();
list.parallelStream().forEach(result::add); // 并发修改 ArrayList 导致数据错误
应使用线程安全的收集器(如 Collectors.toList()):
List<Integer> result = list.parallelStream().collect(Collectors.toList());
2.性能未必更好
并行流需额外开销(任务拆分、线程调度),对小数据量可能更慢。
测试性能:通过基准测试(如 JMH) 验证是否适合并行
3.顺序依赖操作
如 limit()、findFirst() 在并行流中可能性能更差,需改用无序流
list.parallelStream().unordered().limit(10); // 提升性能
ORDERED是Spliterator的特征值, 特征要求并行流保持元素顺序(如 List),改成unordered就是改成序列流,除此之外
并行流的底层机制
1.Spliterator
并行流通过 Spliterator(可拆分迭代器) 将数据源拆分为多个子任务,是 Stream API 并行处理的底层实现
List<String> list = Arrays.asList("Java", "Python", "C++", "Go");
// 使用并行流处理
list.parallelStream() // 将数据源拆分为多个子任务
.map(String::toUpperCase)
.forEach(System.out::println);
// 输出(顺序不确定,因为并行处理):
// PYTHON
// JAVA
// GO
// C++
2.ForkJoinPool
并行流默认使用公共的 ForkJoinPool(线程数 = CPU 核心数)。通过系统属性修改默认线程池大小
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
3.自定义线程池(避免影响全局)
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> list.parallelStream().forEach(...)).get();
顺序流 vs 并行流的对比
特性 | 顺序流 | 并行流 |
---|---|---|
线程模型 | 单线程 | 多线程(ForkJoinPool 默认线程池) |
性能优势 | 简单任务、小数据量 | 大数据量、可并行化的复杂任务 |
开销 | 低 | 高(线程切换、任务拆分/合并) |
顺序保证 | 严格按顺序处理 | 不保证顺序(除非使用有序操作) |
数据源要求 | 无特殊要求 | 数据源需可拆分(如 ArrayList ) |
顺序流和并行流的示例代码
顺序流处理
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
List<String> filtered = names.stream()
.filter(name -> name.length() > 3)
.map(String::toUpperCase)
.collect(Collectors.toList()); // [ALICE, CHARLIE]
并行流处理
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = numbers.parallelStream()//通过底层Spliterator拆分为多个子任务
.filter(n -> n % 2 == 0)
.mapToInt(n -> n * 2)
.sum(); // (2+4+6+8+10)*2 = 60
顺序流并行流总结
顺序流:简单、低开销,适合小数据量或顺序敏感操作。
并行流:通过多线程加速处理,适合大数据量和可并行化任务,但需注意线程安全和性能开销。
选择策略:
优先使用顺序流,仅在必要时(且验证有效)切换为并行流。
避免在并行流中操作共享可变状态。