前言
大龄程序员老王
老王是一个已经北漂十多年的程序员,岁数大了,加班加不过年轻人,升迁也无望,于是拿着手里的一些积蓄,回老家转行创业。他选择了洗浴行业,开一家洗浴中心,没错,一家正规的洗浴中心。之前在北京的时候,喜欢去的澡堂叫“清华池”,他想了想,就给自己的洗浴中心取名为“线程池”。
线程池洗浴中心
线程池开业以后,老王发现有顾客想做足疗,于是就招聘了1个足疗技师,多增加了一项业务增加了收入。随着做足疗的顾客增多,为了赚更多钱又招聘了4个足疗技师。
过了一段时间,洗浴中心的生意越来越好,做足疗的顾客也越来越多。但是,老王发现自己店里的足疗技师已经有5个足疗技师,再招聘就太多了,支付不起再多工资了。足疗技师忙不过来怎么办?老王是个聪明人,马上想到办法:让顾客排队,有哪个足疗技师做完了,空闲出来了,就在队伍里再叫一个顾客继续做。
忙碌的周末
一到周末,来洗浴中心的顾客比平时多了几倍,想足疗的顾客排队时间太长,顾客们已经不耐烦了。老王马上做出反应,又紧急从其他洗浴中心招聘了5个足疗技师,为队伍里顾客做足疗,大大减少排队的顾客。
不过,有时生意太火爆了,紧急招聘的技师也用上了,顾客排队时间也是很长,再来新的顾客,老王只能满脸赔笑地和顾客说:“您下次再来吧,下次给您找个好技师。”,把顾客拒之门外。
过了周末以后,店里不能养闲人啊,老王就把紧急招聘的技师都辞退了。
老王的经营之道
老王的生意越做越红火,很快就要开分店、融资上市、走上人生巅峰。既然这么成功,就让我们来复盘一下他的经营之道吧:
————————————————
1.线程池简介
1.1 线程基本概念
线程生命周期如下图:
新建:java.lang.Thread.State.NEW
public static void thread_state_NEW(){
Thread thread = new Thread();
System.out.println(thread.getState());
}
就绪:java.lang.Thread.State.RUNNABLE
public static void thread_state_RUNNABLE(){
Thread thread = new Thread();
thread.start();
System.out.println(thread.getState());
}
超时等待:java.lang.Thread.State#TIMED_WAITING
public static void thread_state_SLEEP(){
Thread thread3 = new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} });
thread3.start();
Thread.sleep(500);
System.out.println(thread3.getState());
}
等待:java.lang.Thread.State.WAITING
public static void thread_state_WAITING(){
Thread thread2 = new Thread(new Runnable() {
public void run() {
LockSupport.park();
}
});
thread2.start();
Thread.sleep(500);
System.out.println(thread2.getState());
LockSupport.unpark(thread2);
}
阻塞:java.lang.Thread.State.BLOCKED
public static void thread_state_BLOCKED(){
final byte[] lock = new byte[0];
Thread thread1 = new Thread(() -> {
synchronized (lock){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} }
});
thread1.start();
Thread thread2 = new Thread(() -> {
synchronized (lock){
} });
thread2.start();
Thread.sleep(1000);
System.out.println(thread1.getState());
System.out.println(thread2.getState());
}
销亡:java.lang.Thread.State.TERMINATED
public static void thread_state_TERMINATED(){
Thread thread = new Thread();
thread.start();
Thread.sleep(1000);
System.out.println(thread.getState());
}
1.2 线程池基本概念
1.2.1 为什么用线程池
项目中使用线程池也有一些注意事项,参照《Java开发手册 - 泰山版》说明:
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
1.2.2 原理
线程池(ThreadPool):线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动 创建线程有着更多的优势,
常应用于高并发场景下。使用多线程对代码效率进行优化,因此,试用线程池比手动创建线程有着更多的优势:
降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)
提供更强大的功能,延时定时线程池。 Timer vs ScheduledThreadPoolExecutor
常见的线程池结构(UML)
Executor
执行者:顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。
ExecutorService
扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方
提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService
上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可
ThreadPoolExecutor
最常用的线程池,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
1.2.3 线程池状态
RUNNING:接受新任务并处理排队的任务。
SHUTDOWN:不接受新任务,但处理排队的任务。
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务。
TIDYING:所有任务都已终止,workerCount 为零,线程转换到 TIDYING 状态将运行 terminated() 钩子方法。
TERMINATED:terminated() 已完成。
上述为线程池的五种状态,那么这五种状态由什么记录呢?mark一下~下面详细介绍。
1.2.4 执行过程
假设场景:
创建线程池,无限循环添加task,debug看works和queue数量增长规律
等待一段时间后,查看works数量是否回落到core
先附结论:
添加任务,如果线程池中线程数没达到coreSize,直接创建新线程执行
达到core,放入queue
queue已满,未达到maxSize继续创建线程
达到maxSize,根据reject策略处理
超时后,线程被释放,下降到coreSize
2.工作原理
参数介绍
首先我们了解下ThreadPoolExecutor的构造函数
从源码中可以看出,ThreadPoolExecutor的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释
2.1 corePoolSize 核心线程数量:
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize,如果没有达到的话,则会创建一个新线程来处理这个任务。
2.2 maximumPoolSize 线程池最大线程数量:
当前线程数达到corePoolSize后,如果继续有任务被提交到线程池,会将任务缓存到工作队列(后面会介绍)中。如果队列也已满,则会去创建一个新线程来出来这个处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
2.3 keepAliveTime 空闲线程存活时间:
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定
2.4 unit 空闲线程存活时间单位:
keepAliveTime的计量单位,常用 SECONDS(秒) MILLISECONDS(毫秒)
2.5 workQueue 工作队列:
任务队列,用于传输和保存等待执行任务的阻塞队列。当corePoolSize均初始化完成后,再来任务就会直接存入queue中,线程通过getTask()方法自旋获取任务。常见的队列设置如下:
①ArrayBlockingQueue 数组阻塞队列:
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②※LinkedBlockingQuene 链表阻塞队列(注意:可以指定长度):
基于链表的无界阻塞队列(默认最大容量为Interger.MAX,可以指定长度),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而基本不会去创建新线程直到maxPoolSize(很难达到Interger.MAX这个数),因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene 同步队列:
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue 优先级阻塞队列:
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
2.6、threadFactory 线程工厂:
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
2.7、handler 拒绝策略:
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
①CallerRunsPolicy
该策略下,在调用者线程中直接执行被拒绝任务的run方法。
②AbortPolicy
该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
ps:ThreadPoolTaskExecutor默认
③DiscardPolicy
该策略下,直接丢弃任务,什么都不做。
④DiscardOldestPolicy
该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
3 源码解析
介绍了上述线程池的基本信息后,接下来开始源码解析。首先再看个源码基础概念。
3.1 基础概念:CTL
什么是“ctl”?
ctl 是一个打包两个概念字段的原子整数。
1)workerCount:指示线程的有效数量;
2)runState:指示线程池的运行状态,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 等状态。
int 类型有32位,其中 ctl 的低29为用于表示 workerCount,高3位用于表示 runState,如下图所示。
源码介绍:
/**
* 主池控制状态ctl是包含两个概念字段的原子整数: workerCount:指有效的线程数量;
* runState:指运行状态,运行,关闭等。为了将workerCount和runState用1个int来表示,
* 我们限制workerCount范围为(2 ^ 29) - 1,即用int的低29位用来表示workerCount,
* 用int的高3位用来表示runState,这样workerCount和runState刚好用int可以完整表示。
*/
// 初始化时有效的线程数为0, 此时ctl为: 1010 0000 0000 0000 0000 0000 0000 0000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 高3位用来表示运行状态,此值用于运行状态向左移动的位数,即29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程数容量,低29位表示有效的线程数, 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 大小关系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,
* 源码中频繁使用大小关系来作为条件判断。
* 1110 0000 0000 0000 0000 0000 0000 0000 运行
* 0000 0000 0000 0000 0000 0000 0000 0000 关闭
* 0010 0000 0000 0000 0000 0000 0000 0000 停止
* 0100 0000 0000 0000 0000 0000 0000 0000 整理
* 0110 0000 0000 0000 0000 0000 0000 0000 终止
*/
private static final int RUNNING = -1 << COUNT_BITS; // 运行
private static final int SHUTDOWN = 0 << COUNT_BITS; // 关闭
private static final int STOP = 1 << COUNT_BITS; // 停止
private static final int TIDYING = 2 << COUNT_BITS; // 整理
private static final int TERMINATED = 3 << COUNT_BITS; // 终止
runstate获取: