线程池
基本概述
线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作
线程池作用:
- 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
- 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
阻塞队列
基本介绍
Java中的阻塞队列(BlockingQueue)与普通队列相比有一个重要的特点:在阻塞队列为空时,会阻塞当前线程的元素获取操作。具体来说,在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。
有界队列和无界队列:
-
有界队列:有固定大小的队列,比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0
-
无界队列:没有设置固定大小的队列,这些队列可以直接入队,直到溢出(超过 Integer.MAX_VALUE),所以相当于无界
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:FIFO 队列
- ArrayBlockQueue:由数组结构组成的有界阻塞队列,是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize启用拒绝策略。
- LinkedBlockingQueue:由链表结构组成的无界(默认大小 Integer.MAX_VALUE)的阻塞队列,也可以设置有界队列。如果默认无界队列,当接收的任务数量超出corePoolSize数量时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。
- PriorityBlockQueue:支持优先级排序的无界阻塞队列。
- DelayedQueue:使用优先级队列实现的延迟无界阻塞队列,这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,而队列头部的元素是最先过期的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。
- SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个 put 的线程放入元素为止。每次put时必须被get,否则一直阻塞。
与普通队列(LinkedList、ArrayList等)的不同点在于阻塞队列中阻塞添加和阻塞删除方法,以及线程安全:
- 阻塞添加 put():当阻塞队列元素已满时,添加队列元素的线程会被阻塞,直到队列元素不满时才重新唤醒线程执行
- 阻塞删除 take():在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般会返回被删除的元素)
核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入(尾) | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除(头) | remove() | poll() | take() | poll(time,unit) |
检查(队首元素) | element() | peek() | 不可用 | 不可用 |
- 抛出异常组:
- 当阻塞队列满时:在往队列中 add 插入元素会抛出 IIIegalStateException: Queue full
- 当阻塞队列空时:再往队列中 remove 移除元素,会抛出 NoSuchException
- 特殊值组:
- 插入方法:成功 true,失败 false
- 移除方法:成功返回出队列元素,队列没有就返回 null
- 阻塞组:
- 当阻塞队列满时,生产者继续往队列里 put 元素,队列会一直阻塞生产线程直到队列有空间 put 数据或响应中断退出
- 当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列中有可用元素
- 超时退出:当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出
性能比较
主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较:
- Linked 支持有界,Array 强制有界
- Linked 实现是链表,Array 实现是数组
- Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
- Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
- Linked 两把锁,Array 一把锁
线程池的拒绝策略
任务被拒绝有两种情况:
- 线程池已经被关闭。
- 工作队列已满且maximumPoolSize已满。
无论以上哪种情况任务被拒绝,线程池都会调用RejectedExecutionHandler实例的rejectedExecution()方法。 RejectedExecutionHandler是拒绝策略的接口, JUC为该接口提供了以下几种实现:
- AbortPolicy:拒绝策略。如果线程池队列满了,新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池的默认的拒绝策略。
- DiscardPolicy:抛弃策略。该策略是AbortPolicy的Silent(安静)版本,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出
- DiscardOldestPolicy:抛弃最老任务策略。抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除对头元素后再尝试入队。
- CallerRunsPolicy:调用者执行策略。调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。
- 自定义策略,如果以上拒绝策略都不符合需求,那么可自定义一个拒绝策略,实现RejectedExecutionHandler接口的rejectedExecution方法即可。例如阻塞60秒策略。
操作Pool
创建方式
通过Executors工厂类创建
-
newSingleThreadExecuto 创建“单线程化线程池,只有一条线程的线程池,所创建的线程池用唯一的工作线程来执行任务,使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。
//异步任务的执行目标类 static class TargetTask implements Runnable {...} @Test public void testSingleThreadExecutor() { ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { pool.execute(new TargetTask()); pool.submit(new TargetTask()); } sleepSeconds(1000); //关闭线程池 pool.shutdown(); }
线程池有以下特点
- 单线程化的线程池中的任务,是按照提交的次序顺序执行的;
- 池中的唯一线程的存活时间是无限的;
- 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的。
实现原理:线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
-
newFixedThreadPool 创建固定数量的线程池,该方法用于创建一个“固定数量的线程池”,其唯一的参数用于设置池中线程的“固定数量”。
固定数量的线程池”的特点大致如下:- 如果线程数没有达到“固定数量”,每次提交一个任务池内就创建一个新线程,直到线程达到线程池固定的数量。
- 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
- 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。
- “固定数量的线程池”的弊端:内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无线增大,使服务器资源迅速耗尽。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为
Integer.MAX_VALUE
,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出) - 适用于任务量已知,相对耗时的长期任务
-
newCachedThreadPool 创建可缓存线程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
核心线程数是 0, 最大线程数是 29 个 1,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
-
SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
-
适合任务数比较密集,但每个任务执行时间较短的情况。线程池没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能会因线程过多而导致资源耗尽。
-
-
newScheduledThreadPool 创建可调度线程池,一个提供“延时”和“ 周期性”任务的调度功能的ScheduledExecutorService类型的线程池。
//方法二:创建一个可调度线程池,池内含有N个线程, N的值为输入参数corePoolSize public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) ; //测试用例:“可调度线程池” @Test public void testNewScheduledThreadPool() { ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2); for (int i = 0; i < 2; i++) { scheduled.scheduleAtFixedRate(new TargetTask(), 0, 500, TimeUnit.MILLISECONDS); //以上的参数中: 0表示首次执行任务的延迟时间, 500表示每次执行任务的间隔时间 //TimeUnit.MILLISECONDS执行的时间间隔数值,单位为毫秒 } sleepSeconds(1000); //关闭线程池 scheduled.shutdown(); }
开发要求
阿里巴巴 Java 开发手册要求:
-
线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
- 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题
- 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者过度切换的问题
-
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回的线程池对象弊端如下:
- FixedThreadPool 和 SingleThreadPool:请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
- CacheThreadPool 和 ScheduledThreadPool:允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM
创建多大容量的线程池合适?
-
一般来说池中总线程数是核心池线程数量两倍,确保当核心池有线程停止时,核心池外有线程进入核心池
-
过小会导致程序不能充分地利用系统资源、容易导致饥饿
-
过大会导致更多的线程上下文切换,占用更多内存
上下文切换:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态,任务从保存到再加载的过程就是一次上下文切换
核心线程数常用公式:
-
CPU 密集型任务 (N+1): 这种任务消耗的是 CPU 资源,可以将核心线程数设置为 N (CPU 核心数) + 1,比 CPU 核心数多出来的一个线程是为了防止线程发生缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 某个核心就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如在内存中对大量数据进行分析
-
I/O 密集型任务: 这种系统 CPU 处于阻塞状态,用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用,因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N 或 CPU 核数/ (1-阻塞系数),阻塞系数在 0.8~0.9 之间
IO 密集型就是涉及到网络读取,文件读取此类任务 ,特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上
为什么禁止使用Executors快捷创建线程池?
- FixedThreadPool和SingleThreadPool 这两个工厂方法所创建的线程池,工作队列(任务排队的队列)长度都为Integer.MAX_VALUE,可能会堆积大量的任务,从而导致OOM(即耗尽内存资源)。
- CachedThreadPool和ScheduledThreadPool 这两个工厂方法所创建的线程池允许创建的线程数量为Integer.MAX_VALUE,可能会导致创建大量的线程,从而导致OOM问题。
- 所以,大厂的编程规范都不允许使用Executors创建线程池,而是要求使用标准构造器ThreadPoolExecutor创建线程池。
线程池的标准创建方式
大部分企业的开发规范都会禁止使用快捷线程池(具体原因稍后介绍),要求通过标准构造器ThreadPoolExecutor去构造工作线程池。 Executors工厂类中创建线程池的快捷工厂方法实际上是调用ThreadPoolExecutor (定时任务使用ScheduledThreadPoolExecutor )线程池的构造方法完成的。
ThreadPoolExecutor构造方法:
public ThreadPoolExecutor(
int corePoolSize,// 核心线程数,即使线程空闲(Idle), 也不会回收
int maximumPoolSize,// 线程数的上限
long keepAliveTime,// 线程最大空闲(Idle)时长
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, //任务的阻塞排队队列
ThreadFactory threadFactory, //新线程的产生方式
RejectedExecutionHandler handler //拒绝策略
)
参数介绍:
-
corePoolSize:核心线程数,定义了最小可以同时运行的线程数量,当在线程池接收到的新任务,并且当前工作线程数少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求,直到线程数达到corePoolSize。如果当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程。通过设置corePoolSize和maximumPoolSize相同,可以创建一个固定大小的线程池。
-
maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数,救急线程是有空闲时长的 keepAliveTime,当达到最大空闲时长被回收。
-
keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于
corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等到keepAliveTime
时间超过销毁。但是如果调用了allowCoreThreadTimeOut(boolean)方法,并且传入了参数true,则eepAliveTime参数所设置的Idle
超时策略也将被应用于核心线程 -
unit:
keepAliveTime
参数的时间单位 -
workQueue:阻塞队列,存放被提交但尚未被执行的任务
-
threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
-
handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略
RejectedExecutionHandler 下有 4 个实现类:
- AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
- CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
- DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务
补充:其他框架拒绝策略
- Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
- Netty:创建一个新线程来执行任务
- ActiveMQ:带超时等待(60s)尝试放入队列
线程池的任务调度流程:
- 如果当前工作线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
- 如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入到阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
- 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
- 在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务。
- 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略。
线程工厂ThreadFactor
ThreadFactory是Java线程工厂接口,这是一个非常简单的接口,具体如下:
package java.util.concurrent;
public interface ThreadFactory {
//唯一的方法:创建一个新线程
Thread newThread(Runnable target);
}
在调用ThreadFactory的唯一方法newThread()创建新线程时,可以更改创建新线程的名称、线程组、优先级、守护进程状态等。如果newThread()返回值为null,表示线程工厂未能成功创建线程,线程池可能无法执行任何任务。使用Executors创建新的线程池时,也可以基于ThreadFactory(线程工厂)创建,在创建新线程池时 可 以 指 定 将 使 用 ThreadFactory 实 例 。
static public class SimpleThreadFactory implements ThreadFactory {
static AtomicInteger threadNo = new AtomicInteger(1);
//实现其唯一的创建线程方法
@Override
public Thread newThread(Runnable target) {
String threadName = "simpleThread-" + threadNo.get();
Print.tco("创建一条线程,名称为: " + threadName);
threadNo.incrementAndGet();
//设置线程名称和异步执行目标
Thread thread = new Thread(target, threadName);
//设置为守护线程
thread.setDaemon(true);
return thread;
}
}
//线程工厂的测试用例
@org.junit.Test
public void testThreadFactory() {
//使用自定义线程工厂,快捷创建一个固定大小线程池
ExecutorService pool =
Executors.newFixedThreadPool(2, new SimpleThreadFactory());
for (int i = 0; i < 5; i++) {
pool.submit(new TargetTask());
}
//等待10秒
sleepSeconds(10);
Print.tco("关闭线程池");
pool.shutdown();
}
//省略其他
}
注意:这里提到了两个工厂类,比较容易混淆,故作出说明。 Executors为线程池工厂类,用于快捷创建线程池( Thread Pool); ThreadFactory为线程工厂类,用于创建线程( Thread)。
确定线程数
- IO密集型:由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。
- CPU密集型:CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多, CPU执行任务的效率就越低,所以要最高效地利用CPU, CPU密集型任务并行执行的数量应当等于CPU的核心数。
- 混合型任务:混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作,业界有一个比较成熟的估算公式,具体如下:
最佳线程数目 =(线程等待时间与线程CPU时间之比 + 1) * CPU核数
调度器的钩子方法
ThreadPoolExecutor线程池调度器为每个任务执行前后都提供了钩子方法。 ThreadPoolExecutor类提供了三个钩子方法(空方法),这三个空方法一般用作被子类重写,具体如下:
//任务执行之前的钩子方法(前钩子)
protected void beforeExecute(Thread t, Runnable r) { }
//任务执行之后的钩子方法(后钩子)
protected void afterExecute(Runnable r, Throwable t) { }
//线程池终止时的钩子方法(停止钩子)
protected void terminated() { }
- beforeExecute: 线程池工作线程在异步执行完成的目标实例(如Runnable实例)前调用此钩子方法。此方法仍然由执行任务的工作线程调用。
- afterExecute:线程池工作线程在异步执行目标实例后调用此钩子方法。此方法仍然由执行任务的工作线程调用。
- terminated:在Executor终止时调用,默认实现不执行任何操作。
提交方法
向线程池提交任务的两种方式,大致如下:
- 方式一: 调用execute()方法,例如:
//Executor 接口中的方法
void execute(Runnable command);
- 方式二: 调用submit()方法,例如:
//ExecutorService 接口中的方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
submit 和 execute两类方法的区别:
- 接受的参数不一样,execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、 Runnable两种类型的参数。 Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果。
- submit() 方法提交任务有返回值,execute()无返回值;
- submit()方便Exception处理;通过submit()方法返回Future对象(异步执行实例),可以进行异步执行过程中的异常捕获。submit()方法自身并不会传递结果,而是返回一个Future异步执行实例,处理过程的结果被包装到Future实例中,调用者可以通过Future.get()方法获取异步执行的结果。通过submit返回的Future对象获取异步执行结果。
public class CreateThreadPoolDemo {
//省略其他
//测试用例:获取异步调用的结果
@Test
public void testSubmit2() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//返回200~300之间的随机数
return RandomUtil.randInRange(200, 300);
}
});
try {
Integer result = future.get();
Print.tco("异步执行的结果是:" + result);
} catch (InterruptedException e) {
Print.tco("异步调用被中断");
e.printStackTrace();
} catch (ExecutionException e) {
Print.tco("异步调用过程中,发生了异常");
e.printStackTrace();
}
sleepSeconds(10);
//关闭线程池
pool.shutdown();
}
}
虽然用户程序通过submit()也可以提交任务,但是实际上submit()方法中最终调用的还是execute()方法。
ExecutorService 类 API:
方法 | 说明 |
---|---|
void execute(Runnable command) | 执行任务(Executor 类 API) |
Future<?> submit(Runnable task) | 提交任务 task() |
Future submit(Callable task) | 提交任务 task,用返回值 Future 获得任务执行结果 |
List<Future> invokeAll(Collection<? extends Callable> tasks) | 提交 tasks 中所有任务 |
List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) | 提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常 |
T invokeAny(Collection<? extends Callable> tasks) | 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 |
execute 和 submit 都属于线程池的方法,对比:
-
execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行
-
execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出
关闭方法
线程池总共存在5种状态,定义在ThreadPoolExecutor类中。
线程池的5种状态具体如下:
- RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务。
- SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕。
- STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会
中断所有工作线程。 - TIDYING:该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法。
- TERMINATED:执行完terminated()钩子方法之后的状态。
线程池的状态转换规则为:
1)线程池创建之后状态为RUNNING。
2)执行线程池的shutdown()实例方法,会使线程池状态从RUNNING转变为SHUTDOWN。
3)执行线程池的shutdownNow()实例方法,会使线程池状态从RUNNING转变为STOP。
4)当线程池处于SHUTDOWN状态,执行其shutdownNow()方法会将其状态转变为STOP。
5)等待线程池的所有工作线程停止,工作队列清空之后,线程池状态会从STOP转变为
TIDYING。
6)执行完terminated()钩子方法之后,线程池状态从TIDYING转变为TERMINATED。
优雅地关闭线程池主要涉及的方法有3种:
- shutdown:是JUC提供一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后才会执行关闭,但是此方法被调用之后线程池的状态转变为SHUTDOWN,线程池不会再接收新的任务。
- shutdownNow:是JUC提供一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务。
- awaitTermination:等待线程池完成关闭。在调用线程池的shutdown()与shutdownNow()方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()方法。
shutdownNow()方法将会把线程池状态设置为STOP,然后中断所有线程(包括工作线程以及空闲线程),最后清空工作队列,取出工作队列中所有未完成的任务返回给调用者。与有序的shutdown()方法相比, shutdownNow()方法比较粗暴,直接中断工作线程。不过这里需要注意的是,中断线程并不代表线程立刻结束,只是通过工作线程的interrupt()实例方法设置了中断状态,这里需要用户程序主动配合线程进行中断操作。调用了线程池shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成,如果需要等到线程池关闭完成,需要调用awaitTermination()进行主动等待。如果线程池完成关闭, awaitTermination()方法将会返回true,否则当等待时间超过指定时间后将会返回false。
总结:
- 执行shutdown()方法,拒绝新任务的提交,并等待所有任务有序地执行完毕。
- 执行awaitTermination(long timeout,TimeUnit unit)方法,指定超时时间,判断是否已经关闭所有任务,线程池关闭完成。
- 如果awaitTermination()方法返回false,或者被中断,就调用hutDownNow()方法立即关闭线程池所有任务。
- 补充执行awaitTermination(long timeout,TimeUnit unit)方法,判断线程池是否关闭完成。如果超时,就可以进入循环关闭,循环一定的次数(如1000次),不断关闭线程池,直到其关闭或者循环结束。
ExecutorService 类 API:
方法 | 说明 |
---|---|
void shutdown() | 线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务) |
List shutdownNow() | 线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回 |
boolean isShutdown() | 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true |
boolean isTerminated() | 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true |
boolean awaitTermination(long timeout, TimeUnit unit) | 调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 |
处理异常
execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法
方法 1:主动捉异常
ExecutorService executorService = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
System.out.println("task1");
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});
方法 2:使用 Future 对象
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = pool.submit(() -> {
System.out.println("task1");
int i = 1 / 0;
return true;
});
System.out.println(future.get());
Future
线程使用
FutureTask 未来任务对象,继承 Runnable、Future 接口,用于包装 Callable 对象,实现任务的提交
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello World";
}
});
new Thread(task).start(); //启动线程
String msg = task.get(); //获取返回任务数据
System.out.println(msg);
}
构造方法:
public FutureTask(Callable<V> callable){
this.callable = callable; // 属性注入
this.state = NEW; // 任务状态设置为 new
}
public FutureTask(Runnable runnable, V result) {
// 适配器模式
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 使用装饰者模式将 runnable 转换成 callable 接口,外部线程通过 get 获取
// 当前任务执行结果时,结果可能为 null 也可能为传进来的值,【传进来什么返回什么】
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
// 构造方法
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
// 实则调用 Runnable#run 方法
task.run();
// 返回值为构造 FutureTask 对象时传入的返回值或者是 null
return result;
}
}
成员属性
FutureTask 类的成员属性:
-
任务状态:
// 表示当前task状态 private volatile int state; // 当前任务尚未执行 private static final int NEW = 0; // 当前任务正在结束,尚未完全结束,一种临界状态 private static final int COMPLETING = 1; // 当前任务正常结束 private static final int NORMAL = 2; // 当前任务执行过程中发生了异常,内部封装的 callable.run() 向上抛出异常了 private static final int EXCEPTIONAL = 3; // 当前任务被取消 private static final int CANCELLED = 4; // 当前任务中断中 private static final int INTERRUPTING = 5; // 当前任务已中断 private static final int INTERRUPTED = 6;
-
任务对象:
private Callable<V> callable; // Runnable 使用装饰者模式伪装成 Callable
-
存储任务执行的结果,这是 run 方法返回值是 void 也可以获取到执行结果的原因:
// 正常情况下:任务正常执行结束,outcome 保存执行结果,callable 返回值 // 非正常情况:callable 向上抛出异常,outcome 保存异常 private Object outcome;
-
执行当前任务的线程对象:
private volatile Thread runner; // 当前任务被线程执行期间,保存当前执行任务的线程对象引用
-
线程阻塞队列的头节点:
// 会有很多线程去 get 当前任务的结果,这里使用了一种数据结构头插头取(类似栈)的一个队列来保存所有的 get 线程 private volatile WaitNode waiters;
-
内部类:
static final class WaitNode { // 单向链表 volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
成员方法
FutureTask 类的成员方法:
-
FutureTask#run:任务执行入口
public void run() { //条件一:成立说明当前 task 已经被执行过了或者被 cancel 了,非 NEW 状态的任务,线程就不需要处理了 //条件二:线程是 NEW 状态,尝试设置当前任务对象的线程是当前线程,设置失败说明其他线程抢占了该任务,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 执行到这里,当前 task 一定是 NEW 状态,而且【当前线程也抢占 task 成功】 Callable<V> c = callable; // 判断任务是否为空,防止空指针异常;判断 state 状态,防止外部线程在此期间 cancel 掉当前任务 // 【因为 task 的执行者已经设置为当前线程,所以这里是线程安全的】 if (c != null && state == NEW) { V result; // true 表示 callable.run 代码块执行成功 未抛出异常 // false 表示 callable.run 代码块执行失败 抛出异常 boolean ran; try { // 【调用自定义的方法,执行结果赋值给 result】 result = c.call(); // 没有出现异常 ran = true; } catch (Throwable ex) { // 出现异常,返回值置空,ran 置为 false result = null; ran = false; // 设置返回的异常 setException(ex); } // 代码块执行正常 if (ran) // 设置返回的结果 set(result); } } finally { // 任务执行完成,取消线程的引用,help GC runner = null; int s = state; // 判断任务是不是被中断 if (s >= INTERRUPTING) // 执行中断处理方法 handlePossibleCancellationInterrupt(s); } }
FutureTask#set:设置正常返回值,首先将任务状态设置为 COMPLETING 状态代表完成中,逻辑执行完设置为 NORMAL 状态代表任务正常执行完成,最后唤醒 get() 阻塞线程
protected void set(V v) { // CAS 方式设置当前任务状态为完成中,设置失败说明其他线程取消了该任务 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 【将结果赋值给 outcome】 outcome = v; // 将当前任务状态修改为 NORMAL 正常结束状态。 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } }
FutureTask#setException:设置异常返回值
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 赋值给返回结果,用来向上层抛出来的异常 outcome = t; // 将当前任务的状态 修改为 EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion(); } }
FutureTask#finishCompletion:唤醒 get() 阻塞线程
private void finishCompletion() { // 遍历所有的等待的节点,q 指向头节点 for (WaitNode q; (q = waiters) != null;) { // 使用cas设置 waiters 为 null,防止外部线程使用cancel取消当前任务,触发finishCompletion方法重复执行 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自旋 for (;;) { // 获取当前 WaitNode 节点封装的 thread Thread t = q.thread; // 当前线程不为 null,唤醒当前 get() 等待获取数据的线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } // 获取当前节点的下一个节点 WaitNode next = q.next; // 当前节点是最后一个节点了 if (next == null) break; // 断开链表 q.next = null; // help gc q = next; } break; } } done(); callable = null; // help GC }
FutureTask#handlePossibleCancellationInterrupt:任务中断处理
private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) // 中断状态中 while (state == INTERRUPTING) // 等待中断完成 Thread.yield(); }
-
FutureTask#get:获取任务执行的返回值,执行 run 和 get 的不是同一个线程,一般有多个线程 get,只有一个线程 run
public V get() throws InterruptedException, ExecutionException { // 获取当前任务状态 int s = state; // 条件成立说明任务还没执行完成 if (s <= COMPLETING) // 返回 task 当前状态,可能当前线程在里面已经睡了一会 s = awaitDone(false, 0L); return report(s); }
FutureTask#awaitDone:get 线程封装成 WaitNode 对象进入阻塞队列阻塞等待
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 0 不带超时 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 引用当前线程,封装成 WaitNode 对象 WaitNode q = null; // 表示当前线程 waitNode 对象,是否进入阻塞队列 boolean queued = false; // 【三次自旋开始休眠】 for (;;) { // 判断当前 get() 线程是否被打断,打断返回 true,清除打断标记 if (Thread.interrupted()) { // 当前线程对应的等待 node 出队, removeWaiter(q); throw new InterruptedException(); } // 获取任务状态 int s = state; // 条件成立说明当前任务执行完成已经有结果了 if (s > COMPLETING) { // 条件成立说明已经为当前线程创建了 WaitNode,置空 help GC if (q != null) q.thread = null; // 返回当前的状态 return s; } // 条件成立说明当前任务接近完成状态,这里让当前线程释放一下 cpu ,等待进行下一次抢占 cpu else if (s == COMPLETING) Thread.yield(); // 【第一次自旋】,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象 else if (q == null) q = new WaitNode(); // 【第二次自旋】,当前线程已经创建 WaitNode 对象了,但是node对象还未入队 else if (!queued) // waiters 指向队首,让当前 WaitNode 成为新的队首,【头插法】,失败说明其他线程修改了新的队首 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 【第三次自旋】,会到这里,或者 else 内 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞指定的时间 LockSupport.parkNanos(this, nanos); } // 条件成立:说明需要阻塞 else // 【当前 get 操作的线程被 park 阻塞】,除非有其它线程将唤醒或者将当前线程中断 LockSupport.park(this); } }
FutureTask#report:封装运行结果,可以获取 run() 方法中设置的成员变量 outcome,这是 run 方法的返回值是 void 也可以获取到任务执行的结果的原因
private V report(int s) throws ExecutionException { // 获取执行结果,是在一个 futuretask 对象中的属性,可以直接获取 Object x = outcome; // 当前任务状态正常结束 if (s == NORMAL) return (V)x; // 直接返回 callable 的逻辑结果 // 当前任务被取消或者中断 if (s >= CANCELLED) throw new CancellationException(); // 抛出异常 // 执行到这里说明自定义的 callable 中的方法有异常,使用 outcome 上层抛出异常 throw new ExecutionException((Throwable)x); }
-
FutureTask#cancel:任务取消,打断正在执行该任务的线程
public boolean cancel(boolean mayInterruptIfRunning) { // 条件一:表示当前任务处于运行中或者处于线程池任务队列中 // 条件二:表示修改状态,成功可以去执行下面逻辑,否则返回 false 表示 cancel 失败 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果任务已经被执行,是否允许打断 if (mayInterruptIfRunning) { try { // 获取执行当前 FutureTask 的线程 Thread t = runner; if (t != null) // 打断执行的线程 t.interrupt(); } finally { // 设置任务状态为【中断完成】 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 唤醒所有 get() 阻塞的线程 finishCompletion(); } return true; }