基本知识
各位大佬在面试过程肯定会被问到线程池或者多线程的问题,例如:
- 线程池核心参数及其作用
- 线程池添加任务的执行顺序
- 任务队列以及任务的拒绝策略
- 等等
这些问题是相信稍微研究过线程池JDK源码的都能掌握。有兴趣的可以参数这篇博文。
在进入今天正题之前,还是来看看第二个问题:线程池添加任务后,线程池内部是如何进行执行任务的。假设线程池中
- 核心线程数为1
- 最大线程数为2
- 队列是长度为10的有界阻塞队列
线程池执行任务顺序如下:
- 当添加第一个任务(任务A)时,判断当前线程数是否小于核心线程数,是则直接创建新的线程执行该任务。
- 假设任务A还没有执行完,继续添任务B,此时当前线程数等于核心线程数,线程池会直接将任务添加到阻塞队列中。
- 在任务A还没有执行完的情况下,持续不断地添加任务,直到队列填满了任务,还有任务C需要添加,此时会判断线程数是否大于等于最大线程数,小于则会创建新的线程执行任务C,大于则拒绝策略处理
- 当线程池2个线程都在工作且队列已满,任务D需要执行时,线程池则会根据任务拒绝策略处理任务D
在了解了线程池执行任务的原理后,再来看看如下几个问题
- 线程保活。
- 线程池如何动态更新核心线程数和最大线程数?
- 提交Callable任务时,线程之间是如何传递值的?
- 多余的线程是如何结束生命周期的?
线程保活
假设线程(简称T)执行任务A结束后,间隔5s后再向线程池添加任务B,思考这5s线程T是如何保活的或者说它是什么状态。添加任务B的线程是如何让线程T执行任务B的。如果不理解线程的状态转换和阻塞队列的原理,建议劝退。
- 线程A执行execute(taskA)时,会调用addWorker()方法创建线程T,并调用T.start()启动线程T。线程T会执行runWorker()开始while循环,循环条件是第一个任务不等于null或者getTask()(从任务队列里获取任务)不等于null,显然此时第一个任务为taskA,线程T会执行taskA.run()执行任务。执行完taskA后,继续循环判断是否存在第一个任务或者从任务队列里获取任务,显然第一个任务已经执行完成,线程T再执行getTask()会调用队列的take()获取任务,此时队列为空,线程T则会执行Condition.await()方法(即AQS的await方法),await()方法中会调用LockSupport.park(this),最终将自己设置为WAITING状态,也就是所谓的保活(或者说挂起)。
- 5s之后线程B执行execute(taskB),此时线程数量等于核心线程数,线程B会直接调用队列的offer方法把taskB放入队列中,offer方法加锁后调用enqueue方法,将taskB加入队列后执行Condition.signal(),signal方法最终会调用LockSupport.unpark(线程T),即唤醒处于WAITING状态的线程T,将它的状态设置为RUNNABLE。
小结:线程池利用阻塞队列实现任务的排队等待,而阻塞队列包含ReentrantLock属性,利用AQS的条件等待队列调用LockSupport.park(this)将自己设置为WAITING状态。
提交Callable任务
线程池提供了submit()方法来提交Callable任务,实际底层是将Callable包装成FutureTask(它实现了Runnable),然后调用execute()来提交任务
- 当线程A执行execute()方法后返回,紧接着调用FutureTask.get()等待结果返回,在get()判断线程池中任务是否执行完成,是则返回结果,否则调用awaitDone(),通过LockSupport.park(this)将自己设置为WAITING状态。
- 线程池中的线程开始调用FutureTask.run()方法,run()会调用Callable.call(),它的返回结果result会被赋值给FutureTask的outcome属性,然后将任务执行状态设置为NORMAL(2)状态,并调用LockSupport.unpark(t)唤醒线程A
- 线程A唤醒之后判断任务执行状态大于COMPLETING(1),立即返回,最后返回outcome。
小结: submit()执行回调任务也是利用LockSupport来进行线程之间的值传递。
动态更新线程数
直接上代码:
public class ThreadPoolTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
public static void main(String[] args) throws ExecutionException, InterruptedException {
executor.execute(new RunnableTest(executor));
TimeUnit.SECONDS.sleep(5);
executor.setCorePoolSize(2);
executor.setMaximumPoolSize(3);
}
static class RunnableTest implements Runnable {
private ThreadPoolExecutor executor;
RunnableTest(ThreadPoolExecutor executor) {
this.executor = executor;
}
@Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程数:" + executor.getPoolSize());
System.out.println("核心线程数:" + executor.getCorePoolSize());
System.out.println("最大线程数:" + executor.getMaximumPoolSize());
System.out.println("活跃数量:" + executor.getActiveCount());
}
}
}
}
线程池提供了两个set方法API设置核心线程数和最大线程数
多余线程是如何结束生命周期的
线程池所谓核心线程数即线程池需要处理的任务个数小于等于核心线程数时,多余的线程会结束自己的生命周期。
回到线程保活的思路,线程在getTask()中阻塞在队列的take()方法中,而在这之前会判断当前线程池中线程数量是否大于核心线程数,小于则会阻塞在take()中,大于则调用poll(aliveTime)进行阻塞,底层会调用LockSupport.parkNanos(this, aliveTime)。在aliveTime时间范围内,如果没有多余的任务需要执行,则该线程不会再被唤醒。过了aliveTime时间后,该线程自动唤醒跳出循环执行processWorkerExit()。最终调用Thread.interrupt()方法退出。