Java 8之后我们对Stream的使用都已经习以为常了,它帮助我们从怎么做的细节里脱身,只要告诉它做什么即可。这一篇文章我们主要讲Java Stream的实现原理,手写一个Stream框架,然后再来讲解Java Stream的核心类,做到知其然知其所以然。
我们先看一个Stream简单用例,输入一堆字符串创建流(of),取前5个(limit)、操作每个字符串取首字母(map),最后用终结操作符收集所有的首字母到List中(collect)。
List<Character> data = Stream.of("a", "b", "c", "d", "e", "f", "g").limit(5).map(x -> x.charAt(0))
.collect(Collectors.toList());
System.out.println(data);
1. 创建Stream
假设我们要手写一个Stream类,你会怎么实现呢? 要实现Stream.of其实很简单,只要将of提供的参数保存在一个类实例中即可,我们定义了一个ArrayStream来实现这个动作
public static interface MyStream<T> {
static <T> MyStream<T> of(T... ts) {
return new OfStream<>(ts);
}
}
public static class ArrayStream<T> implements MyStream<T> {
private int fromIdx;
private int toIdx;
private T[] data;
public ArrayStream(T... ts) {
this.data = ts;
this.fromIdx = 0;
this.toIdx = ts.length;
}
}
2. 支持limit
在ArrayStream的基础上支持limit操作就很容了,只要调整fromIdx、toIdx的值就即可。不过这里我们通过新定义一个类SliceStream来实现,在limit方法中重新创建SliceStream对象。
public static class SliceStream<T> implements MyStream<T> {
private T[] data;
private int fromIdx;
private int toIdx;
public SliceStream(T[] data, int fromIdx, int toIdx) {
this.data = data;
this.fromIdx = fromIdx;
this.toIdx = toIdx;
}
@Override
public MyStream<T> limit(int n) {
return new SliceStream<T>(this.data, fromIdx, Math.min(fromIdx + n, toIdx));
}
}
在ArrayStream的limit也要使用SliceStream的实例。有的同学可能会疑惑为什么不直接用ArrayStream,修改fromIdx/toIdx,这个案例是可以的,新增类只是为了模拟JDK实现的策略。
public static class ArrayStream<T> implements MyStream<T> {
...
@Override
public MyStream<T> limit(int n) {
return new SliceStream<>(this.data, 0, Math.min(data.length, n));
}
...
}
3. 支持map
照着limit的实现如法炮制,我们定义一个MappedStream类,持有转换函数(Funtion),持有下游MyStream,后续读取MyStream时,先用转换函数操作数据。我们来看下代码
public static class MappedStream<T, R> implements MyStream<R> {
private Function<T, R> apply;
private MyStream<T> stream;
public MappedStream(MyStream<T> stream, Function<T, R> apply) {
this.stream = stream;
this.apply = apply;
}
...
@Override
public void forEach(Consumer<R> consumer) {
stream.forEach((T t) -> {
R r = apply.apply(t);
consumer.accept(r);
});
}
}
在ArrayStream的map方法中,我们讲ArrayStream包装到MappedStream后再返回
public static class ArrayStream<T> implements MyStream<T> {
...
@Override
public <R> MyStream<R> map(Function<T, R> map) {
return new MappedStream<>(this, map);
}
...
}
4. 支持collect
Java Stream讲这类操作称为终结操作,它是实际动作发生的位置。我们定义一个Collector接口,supplier生成中间结果对象,accumulate应用每个元素到中间结果,finisher转换为最终结果
public static interface Collector<T, A, R> {
public Supplier<A> supplier();
public void accumulate(BiConsumer<T, R> consumer);
Function<A, R> finisher();
}
下面是Collector的一个实现,在Collector接口提供一个工厂方法(toList),方便将stream转为list
public static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
private Supplier<A> supplier;
private BiConsumer<T, A> accumulate;
private Function<A, R> finisher;
public CollectorImpl(Supplier<A> supplier, BiConsumer<T, A> accumulate, Function<A, R> finisher) {
this.supplier = supplier;
this.accumulate = accumulate;
this.finisher = finisher;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
@Override
public BiConsumer<T, A> accumulate() {
return accumulate;
}
@Override
public Function<A, R> finisher() {
return finisher;
}
}
public static interface Collector<T, A, R> {
static <T> CollectorImpl<T, List<T>, List<T>> toList() {
Supplier<List<T>> supplier = ArrayList<T>::new;
BiConsumer<T, List<T>> accumulate = (T t, List<T> ls) -> ls.add(t);
Function<List<T>, List<T>> finisher = Function.identity();
return new CollectorImpl<>(supplier, accumulate, finisher);
}
}
在ArrayStream里支持通过collect方法终结操作
public static class ArrayStream<T> implements MyStream<T> {
...
@Override
public <A, W> W collect(Collector<T, A, W> collector) {
Supplier<A> supplier = collector.supplier();
BiConsumer<T, A> consumer = collector.accumulate();
Function<A, W> finisher = collector.finisher();
A middle = supplier.get();
for (int i = fromIdx; i < toIdx; i++) {
T item = data[i];
consumer.accept(item, middle);
}
return finisher.apply(middle);
}
...
}
测试代码如下,查看输出可以确定我们的实现是正常运作的
public static void main(String[] args) {
List<Character> data = MyStream.of("a", "b", "c", "d", "e", "f", "g").limit(5).map(x -> x.charAt(0)).collect(Collector.toList());
System.out.println(data);
}
这个手写stream框架的类图是这个样子的,主要涉及了6个类
5. JDK内置实现
上面我们手写了简单的Stream框架,目的是了解它的工作原理。JDK内置大体思路跟这个类似,但要复杂的多。内置实现中也提供了Stream接口,以及Stream接口的各种实现类,本质类似于我们手写框架类的ArrayStream、MappedStream,通过装饰器模式提供额外功能。下图是核心的Stream实现类
我们用一个最简单的案例来看看Java的执行流程
Stream.of(1, 2, 3).filter(i -> i > 2).findAny().ifPresent(System.out::println);
这个调用的时序图如下图,核心步骤包括:
- Stream.of调用后,通过StreamSupport.stream创建一个ReferencePipeline.Head类实例
- filter(Prediccate)过滤流的时候,会把Head类实例封装为StatelessOp匿名内部类实例,并覆写了onWrapSink方法,在数据提交给Sink之前会先经过Predicate过滤
- findAny,创建一个FindOps实例,StatelessOp对象会调用FindOps的evaluateSequentail方法
- FindOps的evaluateSequentail方法,调用PipelineHelper类(ReferencePipeline实现该接口)的copyInto方法,copyInto方法内部会检测当前操作是否支持短路
- 支持短路操作,调用Spliterator的tryAdvance方法,Sink被设置为cancellationRequested的话就不操作之后的元素
- 不支持短路操作,调用Spliterator.forEachRemaining方法
现在我们完全理解Java Stream的运行流程了,再回过头来看看Java Stream中核心概念,Java Stream将操作分为两类,一类是中间操作,一类是终结操作
- 中间操作,继续返回stream够后续处理
- 终结操作,返回结果或Optional结束计算
中间操作被分为有状态操作(StatefullOps类)、无状态操作(StatelessOps类),终结操作都以TerminateOp表示,是否短路是通过combinedFlags标记的。Stream上不同API对应不同的操作符。
6. 如何评价Java Stream
有不少的文章/数据于对Java Stream做过性能测试,认为它是低效的。公平的将从实现上来说,Java Stream并没有明显存在严重影响性能的涉及。只是因为它有复杂的调用结构(各种包装器)所以对性能略有影响,而多数测试使用的都是极其简单的CPU密集型计算,所以显得对性能影响很大(有的甚至说比正常迭代慢2倍),如果是正常的业务逻辑完全不需要担心Stream会导致问题,而它带来的可读性是实实在在的。
下一篇我们来说说Java Stream的API支持哪些能力,怎么使用,如何扩展,尽情期待。