以往我们需要获取外部资源(数据源、Http请求等)时,需要对目标源创建链接对象,三次握手成功后方可正常使用,为避免持续的资源占用和可能的内存泄漏,还需要调用目标对象close方法释放资源销毁对象。这一建一销的过程在开发阶段倒感觉不到什么影响,但在生产环境,投产阶段中成百上千的并发请求涌入,小小的不足被不断放大,严重可拖垮系统。为节省资源初始化时间,提高输出有效性,池化资源应运而生。接下来我将从由来到原理,到应用场景讲解线程池,希望给各位带来新的启发。
什么是池化思想
池化思想就是预先分配申请好相关资源,并维护复用好这些资源,提前申请好资源,节省实时分配的时间成本,提升性能,降低相关资源损耗。它的应用场景主要以下(包含但不仅限于):
-
内存池(Memory Pooling):预先申请内存,提高申请内存的速度,减少内存碎片;
-
连接池(Connection Pooling):预先申请数据库连接,并保存一定存活的连接,降低系统开销,提高响应速度;
-
对象池(Object Pooling):循环使用对象,减少资源在初始化和销毁时相关资源损耗;
什么是线程池
是一种基于池化思想管理线程的工具,预先申请线程,重复利用的线程资源,它维护多个线程,任务到到达时无需等待线程的创建立刻执行。其优点主要如下(包含但不仅限于):
-
预先分配线程,提升系统响应速度;
-
降低线程创建和销毁的资源损耗;
-
可以对所有线程进行统一的管理和控制;
-
线程和任务分离,提升线程重用性;
JDK中线程池关键类( ThreadPoolExecutor)
Executor(顶层接口)
-
定义了线程池的标准,包含了一个方法execute,参数为一个Runnable接口引用
-
调用者只需要提交任务,不用关心任务执行细节
ExecutorService
-
继承Executor,拓展了更多的接口,包括线程池的关闭、状态管理、任务执行
AbstractExecutorService
-
对ExecutorService的接口提供默认实现;
-
Executor接口的execute方法是未被实现,execute方法是整个线程池的核心
-
所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略
ThreadPoolExecutor
-
主要作用是提供有参构造方法创建线程池,也是围绕着ExecutorService接口的功能做实现
-
一个可供应用程序开发人员扩展的Executor实现类,主要负责线程池的创建和控制
Executors
-
是一个工厂类,主要用来创建不同类型的线程池,是一个工具包方便开发者创建不同类型的线程池
线程池设计原理和核心参数配置
ThreadPoolExecutor线程池几个重要参数,什么时候会创建线程
-
查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步。
-
查看阻塞队列是否已满,不满就将任务存储在阻塞队列中,否则执行第三步。
-
查看线程池是否已满,即是否达到最大线程池数,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务
-
注意:如果队列的任务为空,阻塞队列可以保证任务队列中没有任务时,阻塞获取任务的线程,线程进入wait状态,释放cpu资源
-
阻塞队列可以维持核心线程的存活且不一直占用cpu资源,而一般的队列只能作为一个有限长度的缓冲没有其他功能;
-
阻塞队列自带阻塞和唤醒的功能,无任务执行时线程池利用阻塞队列的take方法挂起;
-
参数 | 说明 |
---|---|
corePoolSize | 线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活 |
maximumPoolSize | 最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务 |
keepAliveTime | 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize |
TimeUnit | 时间单位 |
workQueue | 缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行 |
threadFactory | 线程创建的工厂,一般用默认的 Executors.defaultThreadFactory() |
handler | 当pool已经达到max size的时候,如何处理新任务 |
线程拒绝策略
任务队列满,最大线程数也创建满后,新任务进来会根据拒绝策略处理:
-
AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常;
-
CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行;
-
DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常;
-
DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列;
Executors创建常见线程池种类
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。——阿里巴巴Java手册
企业项目开发
-
日常开发项目,不会直接通过new thread方式进行多线程开发,而是通过线程池方式进行创建
-
JUC包下的Executors工具类提供多种线程池
线程池名称 | 说明 |
---|---|
newFixedThreadPool | 一个定长线程池,可控制线程最大并发数 |
newCachedThreadPool | 一个可缓存线程池, |
newSingleThreadExecutor | 一个单线程化的线程池,用唯一的工作线程来执行任务 |
newScheduledThreadPool | 一个定长线程池,支持定时/周期性任务执行 |
newFixedThreadPool
-
通过参数创建固定数量的线程
newCachedThreadPool
-
如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;
-
该线程池会复用空闲的线程,从而减少创建对象和回收对象带来开销
-
使用
SynchronousQueue
队列,不存储元素的阻塞队列,每put一个必须等待take操作,否则不能添加元素
-
newSingleThreadExecutor
-
一个单线程化的线程池,用唯一的工作线程来执行任务
newScheduledThreadPool
-
线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数
-
使用
DelayedWorkQueue
延迟队列,可以指定多久时间后才可以从队列获取元素
工具类创建线程池
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OoM。
2)CachedThreadPool和ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。——阿里巴巴Java手册
阿里巴巴编码规范
Executors创建的线程池底层也是调用 ThreadPoolExecutor,只不过使用不同的参数、队列、拒绝策略等, 如果使用不当,会造成资源耗尽问题,直接使用ThreadPoolExecutor让使用者更加清楚线程池允许规则,常见参数的使用,避免风险;
应用场景问题解析
商品详情页聚合接口
场景描述
微服务架构下,前端请求后端获取商品详情信息,需要把商品的基础、详情信息、库存、优惠活动等聚合起来,一并的返回给用户,使用了CompletableFuture+自定义线程池。为了
并行请求多个接口,响应时间更快,用户体验更好,基于阿里巴巴编码规范,创建了这样的线程池 (32核64G内存的机器)
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 32, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000),
new ThreadPoolExecutor.DiscardPolicy());
事故产生
大促活动下,多用户进入商品详情页出现超时,页面刷新不出来。
事故分析
高并发业务下,应该最大程度的快速响应用户,上述场景中队列设置过长,maxPoolSize设置失效,导致请求数量增加时大量任务堆积在队列中,应该不设置过大的队列去缓冲并发任务,而是调高corePoolSize和maxPoolSize,快速创建更多线程去处理请求。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,128, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.DiscardPolicy());
商家管理后台业务报表数据统计
场景描述
线下电商全国有多个门店,需要每天凌晨统计相关数据指标,由于涉及到大量数据统计,且希望数据越快出来越好,使用了CompletableFuture+自定义线程池
进行跑批处理,基于阿里巴巴编码规范,创建了这样的线程池 (32核64G内存的机器)。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,1024, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.DiscardPolicy());
事故现象
海量数据处理统计下,经常出现服务端500错误
事故分析
这类场景需要执行大量的任务,但是对比高并发的快速响应, 这类业务是不需要那么快及时响应,而是需要充分利用好系统资源,更准确的完成相关统计,应该配置足够多的队列去缓冲更多待计算的任务,调整合适的corePoolSize去处理任务,也需要避免配置过多线程,由于线程的上下文切换也会导致大量资源的浪费。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,124, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000),
new ThreadPoolExecutor.DiscardPolicy());
线程池的底层参数配置
互联网项目可以分IO密集还是CPU密集项目,不同项目配置不一样
-
CPU密集型项目主要消耗CPU,所以建议设置为跟核心数一样或者+1
-
IO密集型项目,系统会用大部分的时间来处理 I/O 交互,这段时间不占用CPU,所以线程可以配置多点,为2倍CPU核心数
但以上并非万能配置,最好根据压测情况看再进一步优化参数,且不同的模块要做好线程池的隔离。
线程池的状态转换和关闭API的区别
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
*/
//int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池允许的最大线程数, 1左移29位,然后减1,即为 2^29 - 1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取线程池worker数量
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态和数量维护:变量 ctl
这个AtomicInteger包含两部分的信息,使用的是位运算的方式,相比于基本运算,速度快很多:
-
运行状态 (runState) 高3位保存
-
线程池内有效线程的数量 (workerCount),低29位保存
线程池的状态转换:
线程池的状态
状态 | 说明 |
---|---|
RUNNING | 运行状态,能接受新提交的任务,并且也能处理阻塞队列中的任务 |
SHUTDOWN | 关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中已保存的任务。 |
STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程 |
TIDYING | 所有的任务都已终止了,workerCount (有效线程数)为0 |
TERMINATED | terminated() 方法执行完后进入该状态 |
状态迁移:从小到大,-1,0,1,2,3 不会逆向迁移
线程池的关闭 shutdown和shutdownNow的区别
-
shutdown()
-
可以安全关闭线程池,调用 shutdown() 方法之后线程池并不是立刻就被关闭,这个时候线程池不能接受新的任务
-
线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,
-
用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭,没有返回值
-
-
shutdownNow()
-
立刻关闭的意思,不推荐使用这一种方式关闭线程池
-
调用法后线程池不能接受新的任务,会给所有线程池中的线程发送 interrupt 中断信号
-
尝试中断这些任务的执行,会将任务队列中正在等待的所有任务转移到一个 List 中并返回,
-
可以根据返回的任务 List 来进行一些补救的操作,返回一个List< Runnable >
-
渗析核心源码
//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取线程池worker数量
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务执行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 32位,高3位存储线程池状态,低29位存储活跃线程数
int c = ctl.get();
//判断工作线程小于核心线程,则创建新线程,true表示是核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断线程池是否运行,把任务放到队列里面去,返回boolean状态
if (isRunning(c) && workQueue.offer(command)) {
//再次获取值
int recheck = ctl.get();
//如果线程池已经终止,则移除任务,不在响应
if (! isRunning(recheck) && remove(command))
reject(command);
//如果没有线程,则创建一个空的worker,会从队列获取任务执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//队列满后,调用addWorker,创建非核心线程,参数是false,
else if (!addWorker(command, false))
//队列已满,创建非核心线程,失败则执行拒绝策略
reject(command);
}
//用于向线程池中添加一个新的工作线程。如果线程池中的线程数量已经达到maximumPoolSize,则返回false;
//如果线程池已经关闭,则返回false;否则,创建一个新的工作线程,并将其加入工作线程集合中
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//判断线程数,根据传进来参数判断是创建线程数最大值
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//增加worker数量成功,返回到上面的retry
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
//新创建的worker,然后立刻启动,立刻执行任务(不是从队列中获取)
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//用于执行指定的工作线程,首先获取当前线程,然后不断从阻塞队列中取出任务并执行,直到从阻塞队列中取出null为止。
//在每次执行任务之前,会调用beforeExecute()方法和afterExecute()方法,这两个方法可以由子类进行重写,以实现一些特定的功能。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//一直循环判断,当前任务是否有,没的话getTask()从队列中获取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//用于从阻塞队列中获取一个任务,如果当前线程数小于corePoolSize,则会调用workQueue的take方法阻塞在当前
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed用于超时控制,当allowCoreThreadTimeOut是true或者活跃线程数大于核心线程数,则需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//take和poll都是从队列头部【拿出】一个元素,从头部得到并移除该元素
//poll空队列的头部元素时返回null,不抛异常;而take方法对应获得空队列的头部元素时,会阻塞在获取的位置
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}