文章目录
- 前言
- submit方法定义
- Future是什么
- execute、submit方法区别是什么
- submit主干流程逻辑
- newTaskFor做了什么
- FutureTask
- newTaskFor(Runnable runnable, T value)的实现
- FutureTask(Callable<V> callable)的实现
- execute(ftask)
- FutureTask是如何实现线程池执行可获取返回值的
- 那FutureTask的run方法做了什么事情呢?
- run方法
- setException(Throwable t)
- set(V v)
- 将结果(或异常)赋值到outcome 后,如何获取?
前言
在上一篇文章中,已介绍了线程池ThreadPoolExecutor
的概念,运行流程,注意事项以及实战,以及详细拆解分析了线程池任务提交方法execute()。
ThreadPoolExecutor
类本身是没有submit方法的,但其继承了AbstractExecutorService
这个线程池抽象类,这个抽象类呢又实现了ExecutorService
接口,submit方法,正是ExecutorService
接口中的抽象类,这一点,在上篇博文类图分析中也有展示
具体可查看博文:万字详解-JAVA线程池分析实战与利弊详解
从类图得知,submit()一系列方法,属于线程池次顶层接口ExecutorService
的抽象方法,且该接口与execute()方法不同的是其不仅可以接收Runnable
、Callable
甚至还可以有返回值。
通读了上一篇文章后,我们应该清楚,线程池execute方法提交的任务执行后是没有返回值的,那么这个submit居然可以获取返回值,是如何实现的呢?我们带着疑问,走进源码。
submit方法定义
submit是接口ExecutorService
中的几个重载抽象方法
参数为callable的方法,返回一个Future
<T> Future<T> submit(Callable<T> task);
参数为Runnable 和预设结果 的方法,返回一个Future(runnable执行结束后,返回设置的result)
<T> Future<T> submit(Runnable task, T result);
参数为Runnable的方法,返回一个Future
Future<?> submit(Runnable task);
callable和runable一样,都是一个函数式接口(都可以使用Lambda表达式来实现方法)且都是实现线程的方式,区别就是callable可以获取返回值以及抛出异常
那我们如何界定我们调用的submit(Callable task) 方法 还是 submit(Runnable task)方法呢?
很简单,就看我们的lambda代码段逻辑中是否含有返回值就行了
ex:
() -> System.out.println(111) ,这段lambda代码块没有返回值,故此调用的是submit(Runnable task)
Future<?> submit = threadPool.submit(() -> System.out.println(111));
() -> 111 ,这段lambda代码块有返回值,故此调用的是submit(Callable task)
Future<Integer> submit = threadPool.submit(() -> 111);
Future是什么
点击源码查看,Future也是一个接口,其有五个抽象方法,分别是取消任务 、判断是否已取消任务、判断任务是否完成、获取任务结果(一直阻塞等待直至获取到结果)、指定时间内获取任务结果
其有这169+具体实现类,后边我们根据线程池submit的使用,再仔细剖析其中的个别具体实现
execute、submit方法区别是什么
二者的应用场景不同
execute
侧重于异步任务执行,且与调用线程无联动性,只需要提交了后让线程池慢慢消费就好了,任务的结果与调用者线程无关联和依赖。
流程示例Ex:
调用者线程 > 提交线程池执行异步任务(不关心异步任务是否有返回值以及返回值结果)
场景:
调用者线程进行逻辑校验,校验通过后,提交线程池进行短信发送
submit
既侧重与异步执行,又侧重于获取执行后的结果做逻辑处理 或者多个异步任务之间有着关联性
流程示例Ex:
调用者线程 > 提交线程池执行异步任务(关心异步执行结果)可能提交多个异步任务 > 调用者线程拿到多个异步任务结果后再进行逻辑处理
场景:
查询一个复杂的业务数据(这里假设是驾驶员)
调用者线程 判断是否有这个驾驶员 > 提交线程池执行异步任务(任务1 查询从业证 耗时300ms,任务2 查询车辆信息耗时 300ms,任务3查询驾驶证信息 耗时400ms)>等待所有异步任务执行完毕,调用者线程判断任务结果拼接数据返回前端
submit主干流程逻辑
ExecutorService接口的实现类有如下
因AbstractExecutorService
是ThreadPoolExecutor
的父类,且ThreadPoolExecutor
没有再次覆写submit相关方法,因此我们用 ThreadPoolExecutor
中submit系列方法,实际就是使用的父类 AbstractExecutorService
中的方法,所以我们故此我们从这里入手
下方是submit的3个实现
其主干流程很简洁明了
1 根据 Runnable 或 Callable构建RunnableFuture
2 execute执行逻辑
3 返回 RunnableFuture
newTaskFor做了什么
newTaskFor 是将我们的Runnable 或者 Callable 转换为Future或Future的实现,在AbstractExecutorService 中是转为了FutureTask
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {}
FutureTask一种可取消的异步计算类。这个类提供了Future的基本实现,其中包含启动和取消计算、查询计算是否完成以及检索计算结果的方法
下方是FutureTask 这个类的方法展示以及树结构图,我们先简单过一下类中的方法与类结构图,后边会继续详解
上边有一些关键字段信息
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 要执行的任务,执行完后会清理
private Callable<V> callable;
// 任务执行的结果 (也可能是异常)
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
}
newTaskFor(Runnable runnable, T value)的实现
做的事情主要是有两个
1、将runnable转换为callable
调用了Executors.callable方法,将 runnable转换为callable
其实际就是继续将Runnable 使用 RunnableAdapter包裹了一下
自定义了一个RunnableAdapter类去实现 Callable,内部依然存的是runnable,和参数传入的预设结果
2、设置FutureTask的状态
设置FutureTask状态为新建
FutureTask(Callable callable)的实现
做的事情也是两个
1、将我们的callable设置到FutureTask的字段
2、设置FutureTask状态为新建
execute(ftask)
execute方法,调用的是顶层Executor
类中execute (Runnable command)方法
那如果使用ThreadPoolExecutor submit的话则实际就是使用 ThreadPoolExecutor类中 execute方法
那么execute执行的就是 execute(futureTask)
ThreadPoolExecutor类中 execute方法执行逻辑 可以参考我的上篇博文:万字详解-JAVA线程池分析实战与利弊详解
问题就来了,ThreadPoolExecutor执行execute是没有返回值的,我们使用FutureTask包了一下Runnable或者Callable就可以拿到返回值呢?
谜底就在谜面上,FutureTask内部逻辑为我们做了处理
FutureTask是如何实现线程池执行可获取返回值的
还记得上方的FutureTask类结构图吗? FutureTask就是一个Runnable的子类
而我们线程池执行execute会先创建Worker,然后执行Worker
Worker是啥?Worker不也是Runnable的一个包装么,最终都会执行run()方法,我们的FutureTask也一样,在线程池执行的时候,会调用run()方法
那FutureTask的run方法做了什么事情呢?
run方法
从主干流程来说比较简单 判断状态 > 执行callable > 捕获了异常,将结果设置到outcome >清理工作
public void run() {
// 判断线程状态
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
// 结果
V result;
// 是否成功执行
boolean ran;
try {
// 执行call (执行我们的业务逻辑)
result = c.call();
// 设置成功执行标签为true
ran = true;
} catch (Throwable ex)
//特别注意的是,这里捕获了异常
// 设置
result = null;
// 设置成功执行标签为false
ran = false;
// 设置异常信息
setException(ex);
}
if (ran)
// 成功执行了任务,设置结果
set(result);
}
} finally {
// 清理工作,清理线程和状态
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
setException(Throwable t)
这个方法是在FutureTask执行任务时,但出现了异常被捕获进入
把Throwable 赋值到了outcome (将异常设置到了结果字段中)
set(V v)
这个方法是在FutureTask 成功执行后进入,用来为结果字段outcome 赋值
将结果(或异常)赋值到outcome 后,如何获取?
获取结果,简单来说就是获取outcome字段的值
无超时时间,阻塞当前调用者线程,直至获取到结果
有超时时间,阻塞当前调用者线程一定时间,时间到了未获取到则为null