jdk线程池实现原理分析
目录
CompletionService
CompletableFuture
基本原理
CompletableFuture的接口
静态方法
handle() vs whenComplete()
xxxEither()
异常处理exceptionally()
获取任务结果
结束任务
Semaphore
CyclicBarrier
CountDownLatch
jdk线程池实现原理分析里说明了jdk中为什么需要线程,且线程池的实现,这里的两个重点:
- ThreadPoolExecutor对线程的封装,通过阻塞队列将线程和任务的概念区分开了,从而达到线程复用的目的。
- Future实现的promise框架。使得用户可以在父线程中获取子线程的执行结果:包括正常结果和异常结果。从而让使用者既享受到了异步变成的优势(减少阻塞等待),又能保持同步变成的简单性
Future其实是对将来某个时刻会发生的结果进行建模,封装了一种异步计算,返回这个异步计算结果的引用,通过这个引用可以获得运算的状态,以及当运算结束后可以通过这个引用获取异步运算的结果。这比直接使用Thread会好用很多。
通过Future之所以能够获得子线程异步计算的结果,其本质是因为父子线程是共享使用了进程分配的资源。FutureTask封装了这一过程,这个对象里保存了子线程的计算结果,当子线程执行结束的时候会讲结果写入到FutureTask中、而父线程获取的结果也正是FutureTask,通过父子线程都可以访问FutureTask的数据,从而实现了父线程可以获取到子线程的结果。
至于异步计算的状态,其实是FutureTask根据子线程执行任务的阶段定义出来的,用来管理子线程执行任务的过程的。
比如Future#get()其实就是根据这个状态机判断的,如果没有完成,就直接去阻塞当前调用get() 的线程。
简单贴一下FutureTask的最核心的对run()的重写,感受下FutureTask是如何实现让父线程获取子线程结果的:
另外,如果仔细观察过jdk就会发现:jdk中Thread封装了操作系统底层的线程,而给线程提交的任务只能是Runnable,但是如果使用线程池,就可以调Callable,是怎么做到的呢?答案也就在FutureTask中。
其实jdk中的ExecutSerive实现的线程池,向下对线程池的封装非常棒,但是对上提供的接口缺并没有很多,所以导致直接通过ExecutorService使用线程池,有些场景使用成本相对比较高,所以jdk在juc下面又提供了一些工具,来满足特定场景的多线程编程简单易用,这里面估计最出名也最常用的就是CompletionService和CompletableFuture
但Executor实现的Future的局限:
- Future只有一个cancel()接口,这个接口只是会给当前正在驱动对应任务的线程发送一个中断信号(Thread.interrupt()),依靠线程对中断的响应结束对应任务的执行。
这个方法并不支持返回一个指定值,如果我想要结束Future对应的任务,返回指定值,那是不可以的。
一个场景:通过线程池异步获取商品的最新价格,但是如果价格系统的接口如果出现了问题,就会长时间阻塞不返回,这个时候我就会降级从缓存中读取一个价格。如果要实现这个过程,就只能分为两部:1. 调用cancel()结束任务。2. 再去从缓存获取价格。
如果FutureTask直接有结束任务,用指定值返回的接口,那就比较方便了。 - Future的结果在非阻塞的情况下,不能执行更进一步的操作
Future不会通知你它已经完成了,它提供了一个阻塞的get()方法通知你结果。你无法给Future植入一个回调函数,当Future结果可用的时候,用该回调函数自动的调用Future的结果。
ps:Future确实没有提供回调方式,但是起实现FutureTask是有的扩展回调接口的,也正是有因为FutureTask有,所以juc的工具才能够实现子线程任务完成自动回调的。 - Future不具备链式编程能力,即不具备简单的任务编排能力
多线程编程中,其实就是将各个任务提交给线程池不同的线程去异步并行执行,然后最终再将各个子线程执行的任务结果进行回收组装。而这些任务之间,根据业务场景的不同,其实都是有一些依赖关系的,比如任务2依赖任务1的返回值(比如任务2的入参是任务1的返回值)等
ExecuteService是不具备去管理这些任务之间关系的能力的,那就只能是业务方自己去管理。 - Future只能通过catch的方式处理异常
这种方式其实有些不优雅,在所有future#get()的时候都要写一堆try-catch。一个更优雅的方式是如果发生异常,调用一个注册的callback,把异常传给callback就好了。 - Future能够支持的任务类型也只有Callable和Runnable。
ps:其实这两个已经够用了,如果有多个入参和返回值,无非就是要通过POJO封装一下就好了。只是说对于两个出入参的这种情况,也需要自己处理封装一下,稍微麻烦一点,不能拿来即用 - 任务之间不能实现组合
比如map-reduce场景。假如我要统计全国人口,可以让子线程去按照省来统计,然后所有子线程统计完各省情况后,父线程来进行汇总,得到全国的统计情况。那么ExecuteService是支持不了的,同样只能自己去做。
CompletionService
这个工具的作用就是封装了一次ThreadPoolExecutor线程池,然后按照提交给线程池任务的完成时间将对应的Future放到队列中,所以通过CompletionService就能够按照完成时间不阻塞的获取到任务的结果。
ps:这个是不是非常像epoll提供的能力。
其实现也非常简单,继承了FutureTask然后重写了FutureTask的isDone (),在isDone()中完成completionQueue的入队。所以completionQueue里就都是已完成的任务啦。
任务提交进来后,真正提交给线程池的就是重写的QueuingFuture就好了
CompletableFuture
这个工具就是对Future的一个补充,提供了多任务的编排能力、异常处理等能力,它其实基本上解决了所有的上面所说的Future对外暴漏的接口不够丰富的问题。
基本原理
CompletableFuture本质也是一个Future,所以是集成了Future接口的。同时还集成了CompletionStage接口。CompletionStage中对外暴漏的接口就是对Future的补充,提供的就是一些任务编排、异常处理、更丰富的任务形式封装等等。
从命名上也可以看出,CompletableFutue的思路就是将多个任务看成是一个大的任务的各个阶段,每个阶段(子任务)都是可以通过子线程来驱动完成,而CompletionStage表示的就是对一个子任务的封装。在CompletionStage中提供了子任务的组合、依赖等简单关系的编排,从而构成一个大的任务。CompletableFuture的结构是比较简单的,它就只有两个字段:
- Object result:用于保存当前任务的执行结果。
- Completion stack:Completion结构其实是一个链表的node,用它来封装子任务,并将子任务构成一个链表,依靠这个链表来实现任务的编排。
- Executor asyncPool:用于异步执行任务的线程池。
重点看下Completion结构。Completion用来封装一个具体任务的链表,只是说不同的任务类型,有自己特殊的要求,所以不同任务类型继承Completion添加自己的特性。
Completion的uml图:
CompletableFuture的实现,主要依赖了UniCompletion,它里面定义了三个字段:
- Executor executor:执行当前任务的线程池。
如果是同步执行executor=null,任务在当前线程中执行;
如果提交任务的时候没有指定线程池,默认就是forkJoin线程池。 - CompletableFutre src:上一个任务的CompletableFuture
- CompletableFuture dep:当前提交的任务的CompletableFuture。
比如cf.thenXX(fun),表示的就是cf成功执行后,执行fun,这个调用会new一个Completion对象,这个新建的Completion封装的就是对当前任务的封装,其中的src就是cf,dep就是new出来的fun任务对应的CompletableFuture
CompletableFuture实现中,能够支持的任务类型,都是UniCompletion的子类,有些类型的任务需要一些额外的参数,那就会集成UniCompletion,然后添加自己需要的属性。
比如BiCompletion增加了另一个CompletableFutre snd属性:用来支持xxXXBoth()/xxXXXEither()这种操作,这些操作的特点是都会传入一个CompletionStage(CompletableFuture实现了该接口),表示两个都或者任意一个完成的时候,执行指定的任务。BiCompletion中的snd属性,记录的就是入参的CompletableFuture
比如cf.thenRunBoth(cf1,fun)表示的是cf和cf1代表的任务都执行完成后,执行fun。执行cf.thenRunBoth(cf1,fun),就会创建一个BiRun对象,其中的src就是cf、dep是new的一个CompletableFuture、snd就是入参的cf1
下面举例简单说明下,CompletableFuture是如何通过Completion链表来实现简单的任务编排的,以cf.thenXXX(fun1).handle(fun2)为例:它表示的是:
- 如果cf正常执行完后,就执行fun1,fun1执行完后再执行fun2;
- 如果cf执行出错,就不执行fun1,直接执行fun2。
其中:newCf封装的就是fun1对应的任务
再次使用cf提交任务:
其中:newCf封装的就是fun1对应的任务,newCf2封装的就是fun2对应的任务。如下以thenApplay()解释下:
private <V> CompletableFuture<V> uniApplyStage(Executor executor, Function<? super T,? extends V> fun) {
if (fun == null) {
throw new NullPointerException();
}
// 这个就是封装了当前任务的fun的Future,父线程通过这个Future来获取当前任务的运行结果
CompletableFuture<V> depCf = new CompletableFuture<V>();
// dep.uniApply()
// 1. 判断上个任务是否完成,如果没完成,就直接返回false。上个任务指的就是cf.thenApply(fun)中的cf表示的任务
// 2. 如果上个任务已经完成:
// 2.1 如果上个任务是异常结束的,则啥也不干就直接返回了
// 2.2 如果上个认为是正常返回的,那么就会调用fun的方法,执行任务(如果是异步执行就是提交给线程池(c.claim())、如果是同步执行就直接调用fun.apply()方法)
if (executor != null || !depCf.uniApply(this, fun, null)) {
// UniApply是Completion的子类,表示的是一个入参一个返回值任务(Function任务)。
CompletableFuture.UniApply<T,V> newCf = new CompletableFuture.UniApply<T,V>(executor, depCf, this, fun);
// 将新建的Completion节点加入到链表中。
push(newCf);
// 这个就是真正将任务提交给线程池执行的,其内部也是调用depCf.uniApply()
newCf.tryFire(SYNC);
}
return depCf;
}
CompletableFuture的接口
静态方法
这些静态方法都是指定一个任务初始化一个CompletableFuture的,也就是说这里制定的任务就是后续编排的起点,即第一个任务。
任务编排之前一任务成功执行才执行指定任务thenXXX()
上一个任务成功后,才执行制定任务;如果上一个任务执行失败,就不会执行指定任务。
举个例子:cf.thenXXX(fun):当cf代表的任务正常结束后,去执行fun任务,否则就不会执行fun。
对于指定任务的执行可以是同步执行、也可以是异步执行,就看方法名是否带有async的后缀。如果是cf.thenXXXAsync(fun),那么fun的执行就是在异步线程的,这个时候,如果指定了线程池就是在指定线程中的线程执行;如果不指定就是在forkjoin默认线程池中执行。
thenXXX()支持的几个任务类型:
- Runnable:无入参,无返回值
- Comsumer:一个入参,无返回值
- Function:一个入参,一个返回值
- BiFunction:两个入参,一个返回值
thenXXX()支持的几个方法:
- cf.thenRun(Runnable):这个是最简单的,没有入参也没有返回值的任务。
- cf.thenAccept(Consumer):一个入参、没有返回值。其入参就是cf的返回值,如果cf的任务是无返回值,theAccept()在执行Consumer的时候,从cf中获取到的result就是null,所以传入给Consumer的入参也就是null。
由于要讲cf的返回值传给Consumer,所以Consumer的入参类型必须是cf的返回值的子类型。 - cf.thenApply(Function):一个入参一个返回值。入参规则和thenAccept()是一样的,返回值可以是任意类型。
- cf.thenCompose(Function):同样是一个入参一个返回值,入参规则和thenApply()是一样的,但是thenCompose()的返回值有特殊要求:返回值必须是CompletionStage,但需要注意的是,虽然thenCompose()返回的是一个CompletionStage类型,但是在CompletableFuture内部的result字段保存的是对应的thenCompose()返回的CompletionStage任务的返回值,所以cf.thenCompose().thenAccept(),最后一个thenAccept的入参类型不是thenCompose()返回的CompletionStage类型,而是thenCompose()返回的CompletionStage的结果的类型。
ps:thenAppley()和thenCompose()的区别就像stream api中map和flatmap的区别:map()返回任意类型,而flatMap()要求返回Stream类型 - cf.thenCombine(cf2,BiFunction):当cf和cf2都正常返回的时候才会执行BiFunction,然后将cf和cf2的返回值作为BiFunction的入参:第一个入参是cf的返回值,第二个入参是cf2的返回值。
注意:
- cf.runAsync(runnable1).thenRun(runnable2).thenRunnable(runnable3),第一个是异步执行,而后续的是同步执行,但是同步执行的意思是和runnable1而言的,即runnable1、runnable2、runnable3是在同一个线程串行执行的
- cf.thenRun(runnable1).thenRun(runnable2).thenRunnable(runnable3),如果是第一个抛出了一场,后续的runnable2、runnable3都不会执行了。
- cf.thenRun(runnable1).thenAccept(consumer1).thenApply(function1),如果任务有入参,那么入参就是上个任务的返回值,如果上个任务无返回值,则入参将被传入null(CompletableFuture中的result字段没有被赋值,默认就是null,所以实际上传入给下个任务的入参就是CompletableFuture#result字段)。
所以,下个任务的入参类型,要求必须是上个任务的返回值的子类型,否则传递就会报错。不过CompletableFuture使用了泛型限制了类型,所以如果类型不对,编译就会报错
handle() vs whenComplete()
这两个方法的语义都是一样的,唯一的区别是handle()接收的是BiFunction任务有返回值、whenCompelte接收的是BiConsumer任务,没有返回值:
不管上个任务是正常执行结束还是异常结束,都将执行当前指定的任务
举个例子:cf.handle(bifunction)/cf.whenComplete(biconsumer):不管cf对应的任务是正常执行还是异常执行,都将执行bifunction任务。
bifunction/biconsumer任务有两个入参:
第一个入参就是cf的返回值,所以要求第一个入参的类型需要是cf返回值的子类型。如果cf没有返回值或者异常结束返回的就是null
第二个参数就是cf的执行抛出的异常,如果cf正常结束,传入的就是null
xxxEither()
其入参都是一个CompletionStage类型,只是不同的xxx入参的CompletionStage的任务类型有不一样+一个任务
其语义就是当cf对应的任务+指定的CompletionStage中,有任意一个正常完成了,就执行制定的任务。但如果只要有一个抛出了异常,就都不会执行指定任务了
如果指定任务有入参(Consumer、Function),那么入参就是先执行完成的那个的返回值
- xxxEither支持三种任务(当然都有对应的Async版本的重载)
- cf.runAfterEither(CompletionStage<Runnable>,runnable)
- cf.acceptEither(CompletionStage<Consumer>,consumer);
- cf.applyToEither(CompletionStage<Function>,function);
举个例子:cf.acceptEither(consumerCf1, consumer),当cf、consumerCf1有任何一个正常执行返回了,则就会执行consumer指定的任务,那么传给consumer的入参就是最先执行完成的那个任务的返回值。但是cf、consumerCf1有任何一个抛出了一场,就都不会执行consumer任务了。
ps:这个只是支持两个的任意一个,但是CompletableFuture#anyOf()静态方法就可以是任意多个
xxxBoth()
这个和xxxEither()是差不多的,只不多是cf和指定任务都正常完成,才执行指定的任务。只有有任意一个抛异常,就不会执行指定任务了
xxxBoth()就两个(有对应的Async的重载形式)
- cf.runAftertBoth(CompletableFuture<Runnable>,runnable)
- cf.thenAcceptBoth(CompletableFuture<Consumer> cf2,biConsumer):biConsumer的两个入参按顺序分别是cf的返回值和cf2的返回值
异常处理exceptionally()
上个任务出现异常的时候才执行指定任务。
cf.exceptionally(function):
- 只有cf对应的任务执行异常的时候,才会执行指定任务fun
- 执行的任务fun是有一个入参一个返回值的,入参就是上个任务执行时抛出的异常,fun的结果会记录到当前任务的CompletableFuture#result中。
获取任务结果
查看任务执行状态的
- cf.isCancelled();
- cf.isCompletedExceptionally();
- cf.isDone();
- cf.getNumberOfDependents()
获得执行结果的
- cf.get();
- cf.get(1,TimeUnit.SECONDS);
- cf.getNow(null); 这个是相比于Future多出来的,如果任务没完成,不阻塞,直接返回指定值。
还有一个等待任务执行完成的方法:cf.join();
结束任务
Future#cancel()方法,如果任务开始了,会发中断信号给驱动任务执行的线程,只要是响应中断的任务,那么收到中断信号后,就不会执行任务了。
但是CompletableFuture结束任务的方式实际上根本没有真正去结束线程,只是修改了CompletableFuture中维护的结果,这样可以让CompletableFuture#get()可以立即返回,但实际线程正在执行的任务不会像Future#cancel()一样,可能会被终止掉,所以提交给CompletableFuture管理的任务是一个写入任务的时候,就要小心使用主动结束任务方法了。
CompletableFuture现在的实现是支持不了中断的,因为CompletableFuture的实现只是将任务提交给了线程池,根本不知道谁驱动的任务执行,而FutureTask是记录了驱动任务执行的线程的。
CompletableFuture有三个方法可以结束任务:
-
cf.cancel(true):将结果设置为异常CancellationException。因为将result设置成了异常,所以如果任务还没开始,那么就不再会执行了。但是任务已经开始了,那么就会让任务执行结束,只是拿不到任务执行的结果而已,入参mayInterruptIfRunning根本就没有使用 cf.complete(null):仅仅是将CompletableFuture#result的值设置成指定的值 obtrudeException(excepteEx)/cf.completeExceptionally(ex):将结构设置成了指定的异 常。这两个卫衣的区别就是设置值的方式:前者是直接赋值、后者是CAS赋值。
总结:
- 详细的使用可参考:Java 8 CompletableFuture 教程 - yexiaoxiaobai - SegmentFault 思否
- 要正确使用CompletableFuture并不容易,要对其实现的逻辑比较清楚,方可正确使用,所以在使用的时候还是要多注意,所以个人不是很建议使用CompletableFuture,比较容易踩坑
- 使用cf.thenXXX()的时候,要注意cf异常后,就不会执行指定任务了,而且稍不注意就讲异常给吞掉了,比较难以发现为什么xxx没有执行,排查问题并不好排潮
- cf.thenXXX(),如果指定任务是有入参的,那么指定任务的入参和cf的返回值是有关系的,要注意cf返回值和指定任务入参的集成关系
- cf.xxBoth()的时候,指定任务是cf和指定CompletionStage的返回值,是按照顺序赋值的,这要小心,当心搞错。
- handl()和whenComplete()两个的区别,真的不容易发现,特别是cf.whenComplete()在new的CompletableFuture#result记录的是和cf一样的结果,那么如果cf.whComplete().thenXXX(fun),那么传递给fun的入参其实是cf的返回值,而cf.handle(hfun).thenXXX(fun)传递给fun的入参就是hfun指定的结果,而不是cf。
- cancel()/complete()/obtrudeException()这些都是不支持中断的,这个如果使用场景取消是要结束任务的,而任务又是一个写入操作的时候,就要格外注意。
Semaphore
信号量的实现。常见的使用场景就是对象池,比如db的连接池、rpc的连接池等
使用上可参考:简单对象池(享元模式)的实现
CyclicBarrier
CyclicBarrier - 简书