王有志,一个分享硬核 Java 技术的互金摸鱼侠
加入 Java 人的提桶跑路群:共同富裕的Java人
今天是《面霸的自我修养》第 6 篇文章,我们一起来看看面试中会问到哪些关于线程池的问题吧。
数据来源:
- 大部分来自于各机构(Java 之父,Java 继父,某灵,某泡,某客)以及各博主整理文档;
- 小部分来自于我以及身边朋友的实际经历,题目上会做出标识,并注明面试公司。
叠“BUFF”:
- 八股文通常出现在面试的第一二轮,是“敲门砖”,但仅仅掌握八股文并不能帮助你拿下 Offer;
- 由于本人水平有限,文中难免出现错误,还请大家以批评指正为主,尽量不要喷~~
线程池是什么?为什么要使用线程池?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
计算机中,线程的创建和销毁开销较大,频繁的创建和销毁线程会影响程序性能。利用基于池化思想的线程池来统一管理和分配线程,复用已创建的线程,避免频繁创建和销毁线程带来的资源消耗,提高系统资源的利用率。
线程池具有以下 3 点优势:
- 降低资源消耗,重复利用已经创建的线程,避免线程创建与销毁带来的资源消耗;
- 提高响应速度,接收任务时,可以通过线程池直接获取线程,避免了创建线程带来的时间消耗;
- 便于管理线程,统一管理和分配线程,避免无限制创建线程,另外可以引入线程监控机制。
Java 中如何创建线程池?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中可以通过 ThreadPoolExecutor 和 Executors 创建线程池。
使用 ThreadPoolExecutor 创建线程池
使用 ThreadPoolExecutor 可以创建自定义线程池,例如:
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
10,
20,
10,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadPoolExecutor.AbortPolicy()
);
了解以上代码的含义前,我们先来看 ThreadPoolExecutor 提供的的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 提供了 4 个构造方法,但最后都会指向含有 7 个参数的构造方法上。我们一一说明这些参数的含义:
int corePoolSize
,线程池的核心线程数量,核心线程在线程池的生命周期中不会被销毁;int maximumPoolSize
,线程池的最大线程数量,超出核心线程数量的非核心线程;long keepAliveTime
,线程存活时间,非核心线程空闲时存活的最大时间;TimeUnit unit
,keepAliveTime 的时间单位;BlockingQueue<Runnable> workQueue
,线程池的任务队列;ThreadFactory threadFactory
,线程工厂,用于创建线程,可以自定义线程;RejectedExecutionHandler handler
,拒绝策略,当任务数量超出线程池的容量(超过 maximumPoolSize 并且 workQueue 已满)时的处理策略。
使用 Executors 创建线程池
除了使用 ThreadPoolExecutor 创建线程池外,还可以通过 Executors 创建 Java 内置的线程池,Java 中提供了 6 种内置线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
ExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
ExecutorService workStealingPool = Executors.newWorkStealingPool();
Tips:关于这 6 种线程池的详细解释,参考下一题。
Java 中提供了哪些线程池?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中提供了 6 种线程池,可以通过 Executors 获取。
FixedThreadPool
通过 Executors 创建 FixedThreadPool 的代码如下:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
FixedThreadPool 是固定线程数量的线程池,通过Executors#newFixedThreadPool
方法获得,源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
本质上是通过 ThreadPoolExecutor 创建的线程池,核心线程数和最大线程数相同,工作队列使用 LinkedBlockingQueue,该队列最大容量是 Integer.MAX_VALUE
。
SingleThreadExecutor
通过 Executors 创建 SingleThreadExecutor 的代码如下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
SingleThreadExecutor 是只有一个线程的线程池,通过Executors#newSingleThreadExecutor
方法获得,其源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
依旧是通过 ThreadPoolExecutor 创建的线程池,最大线程数和核心线程数设置为 1,工作队列使用 LinkedBlockingQueue。SingleThreadExecutor 适合按顺序执行的场景。
ScheduledThreadPool
通过 Executors 创建 ScheduledThreadPool 的代码如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
ScheduledThreadPool 是具有定时调度和延迟调度能力的线程池,通过Executors#newScheduledThreadPool
方法获得,其源码如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
与前两个不同的是 ScheduledThreadPool 是用过 ScheduledThreadPoolExecutor 创建的,源码如下:
public class Executors {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());
}
}
追根溯源的话 ScheduledThreadPoolExecutor 依旧是通过 ThreadPoolExecutor 的构造方法创建线程池的,能够实现定时调度的特性是因为ScheduledThreadPoolExecutor#execute
方法和ScheduledThreadPoolExecutor#schedule
方法实现的:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
}
因为 ScheduledThreadPoolExecutor 并不是线程池中的重点内容,这里我们不过多讨论源码的实现,我们接下来看 ScheduledThreadPoolExecutor 该如何使用:
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("当前时间:" + simpleDateFormat.format(new Date()));
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("执行时间:" + simpleDateFormat.format(new Date()) + ",延迟3秒执行");
}
}, 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("执行时间:" + simpleDateFormat.format(new Date()) + ",每3秒执行一次");
}
}, 0, 3, TimeUnit.SECONDS);
SingleThreadScheduledExecutor
通过 Executors 创建 ScheduledExecutorService 的代码如下:
ExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
与 ScheduledThreadPool 相同 SingleThreadScheduledExecutor 也是具有定时调度和延迟调度能力的线程池,同样的 SingleThreadScheduledExecutor 也是通过 ScheduledThreadPoolExecutor 创建的,不同之处在于 ScheduledThreadPool 并不限制核心线程的数量,而 SingleThreadScheduledExecutor 只会创建一个核心线程。
CachedThreadPool
通过 Executors 创建 CachedThreadPool 的代码如下:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
CachedThreadPool 是可缓存线程的线程池,通过Executors#newSingleThreadExecutor
方法获得,其源码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
CachedThreadPool 的特点是没有核心线程,任务提交后会创建新线程执行,并且没有最大数量限制,每个线程空闲后的存活时间是 60 秒,如果 60 秒内有新的任务提交,会复用这些线程,也就实现了线程缓存的能力,工作队列使用 SynchronousQueue,它不存储任何元素,使用 SynchronousQueue 的目的是为了能够立即创建(使用)非核心线程(涉及到 ThreadPoolExecutor 的实现原理,后文详解)执行任务。
WorkStealingPool
通过 Executors 创建 ScheduledExecutorService 的代码如下:
ExecutorService workStealingPool = Executors.newWorkStealingPool();
WorkStealingPool 是 Java 8 中加入的线程池,与之前的 5 种线程池直接或间接的回归到 ThreadPoolExecutor 不同,WorkStealingPool 是通过 ForkJoinPool 实现的(ForkJoinPool 与 ThreadPoolExecutor 都是 AbstractExecutorService 的实现),内部通过 Work-Stealing 算法并行的处理任务,但无法保证任务的执行顺序。
Executor 框架是什么?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Executor 用于执行 Runnable(Callable)任务,提供了将 Runnable(Callable) 与运行机制解耦的能力,屏蔽了线程创建,调度方式等。Java 中的注释是这样描述 Executor 的:
An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.
Executor 的体系:
Executor 体系中 ,ExecutorService 接口对 Executor 进行了扩展,提供了管理 Executor 和查看 Executor 状态的能力。
An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.
另外,我把 Executors 也加入到了 Executor 的体系结构中,虽然没有继承或者实现的关系,但是根据 Java 中的命名规范,Executors 是作为 Executor 的工具类出现的,从类图中可以看到 Executors 提供了
线程池都有哪些状态?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中定义了线程池的 5 种状态:
private static final int RUNNING = -1 << COUNT_BITS; // 111 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS; // 001 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS; // 010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS; // 011 0 0000 0000 0000 0000 0000 0000 0000
注释中也非常详细的解释了每个状态:
RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don’t accept new tasks, but process queued tasks
STOP: Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed
- RUNNING:接收新任务,并处理队列中的任务;
- SHUTDOWN:不接收新任务,仅处理队列中的任务;
- STOP:不接收新任务,不处理队列中的任务,中断正在执行的任务;
- TIDYING:所有任务已经执行完毕,并且工作线程为 0,转换到 TIDYING 状态后将执行 Hook 方法
ThreadPoolExecutor#terminated
; - TERMINATED:
ThreadPoolExecutor#terminated
方法执行完毕,该状态表示线程池彻底终止。
另外注释中还详细的描述了线程池状态间转换的规则:
RUNNING -> SHUTDOWN On invocation of shutdown()
(RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
SHUTDOWN -> TIDYING When both queue and pool are empty
STOP -> TIDYING When pool is empty
TIDYING -> TERMINATED When the terminated() hook method has completed
- RUNNING -> SHUTDOWN:通过调用
ThreadPoolExecutor#shutdown
方法; - RUNNING 或 SHUTDOWN -> STOP:通过通过调用
ThreadPoolExecutor#shutdownNow
方法; - SHUTDOWN -> TIDYING:工作线程为 0(没有处理中的任务),并且工作队列中待处理的任务为 0 时;
- STOP -> TIDYING:工作线程为 0 时(没有处理中的任务);
- TIDYING -> TERMINATED:TIDYING 状态下,调用
ThreadPoolExecutor#terminated
方法。
最后我们通过一张图来看下线程池状态之间的转换:
🔥线程池是如何实现的?
:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:阿里巴巴,美团,蚂蚁金服
:::
我们通过一段演示代码尝试着推测线程池的执行过程,首先我们实现一个自定义的阻塞队列:
public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> {
public CustomBlockingQueue(int capacity) {
super(capacity);
}
@Override
public boolean offer(E e) {
boolean result = super.offer(e);
if (result) {
System.out.println("添加到队列中");
}
return result;
}
}
CustomBlockingQueue 继承自 LinkedBlockingQueue,并重写了LinkedBlockingQueue#offer
方法,在元素成功添加到队列时输出一行日志,为的是能够清晰的展示线程池中任务添加到工作队列中的时机。
接着我们创建线程池:
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
3,
3,
10,
TimeUnit.SECONDS,
new CustomBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
线程池的容量是 9,即最大线程 6 个(核心线程 3 个,非核心线程 3 个,工作队列容量为 3),拒绝策略是 AbortPolicy,超出线程池的容量后,直接丢弃任务。
使用这个线程池同时执行 10 个任务:
for(int i = 1; i <= 10; i++) {
int finalI = i;
threadPoolExecutor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ",开始执行任务:" + finalI);
try {
// 为了能够长时间占用线程
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(threadName + ",结束执行任务:" + finalI);
});
// 为了能够实现按顺序提交任务
TimeUnit.SECONDS.sleep(1);
}
执行上述代码,可以看到打印的日志为:
根据执行结果,并且结合之前对于线程池参数的解释,我们就能推测出线程池的大致流程:
- 编号 1~3 的任务提交到线程池后,创建核心线程执行任务;
- 编号 4~6 的任务提交到线程池后,因为超出核心线程的数量,将任务添加到工作队列中;
- 编号 7~9 的任务提交到线程池后,因为工作队列已满,创建非核心线程执行任务;
- 编号 10 的任务提交到线程池后,因为超出线程池的容量,执行拒绝策略;
- 编号 1~3 的任务执行完毕后,核心线程空闲,取出工作队列中编号 4~6 的任务开始执行。
我们用一张图来展示上述代码中任务执行的顺序:
以上是我们通过案例来推测出的线程池工作流程,接下来我们通过源码分析来印证我们推测的结果,并梳理线程池执行过程中的具体细节。
线程池的源码分析
CTL 与线程池状态
分析线程池执行流程前,我们先来看 ThreadPoolExecutor 中定义的主控状态 CTL 和线程池状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 111 0 0000 0000 0000 0000 0000 0000 0000
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111 1111
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS; // 111 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS; // 001 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS; // 010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS; // 011 0 0000 0000 0000 0000 0000 0000 0000
// 计算CTL
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
// 获取线程池状态
private static int runStateOf(int c) {
return c & ~COUNT_MASK;
}
// 获取线程池工作线程数
private static int workerCountOf(int c) {
return c & COUNT_MASK;
}
Doug Lea 采用了位掩码技术来设计 CTL,将 CTL 拆分成了 2 个部分,高 3 位表示线程池状态,低 29 位表示工作线程数量,因此线程池最多允许 536870911 个线程处于工作状态。
Java 中采用补码的形式表示数字,最高位是符号位,0 表示正数,1 表示负数,RUNNING 状态在二进制表示中最高位是 1,是负数,其余状态的数值均为正数。结合上一题的分析,我们可以得到,线程池从运行状态到终止状态,随着状态的转换,每种状态的数值表示是逐渐增大的。
Tips:如果不熟悉位运算,可以参考我的另一篇文章《编程技巧:“高端”的位运算》。
线程池的任务执行
我们从任务执行的入口ThreadPoolExecutor#execute
的源码开始:
public void execute(Runnable command) {
// 校验提交的任务
if (command == null) {
throw new NullPointerException();
}
// 获取CTL
int c = ctl.get();
// STEP 1:工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
// addWorker方法执行失败时,重新获取CTL
c = ctl.get();
}
// STEP 2:工作线程数大于核心线程数,但可以添加到工作队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
// STEP 3:工作队列中无法继续添加任务
else if (!addWorker(command, false)) {
reject(command);
}
}
ThreadPoolExecutor#execute
源码中,可以将整体的执行逻辑分为 3 步(见注释)。详细分析这 3 步前,我们先来看频繁出现的ThreadPoolExecutor#addWorker
方法,由于该方法的源码较长,我们拆开来分析。ThreadPoolExecutor#addWorker
方法的声明:
private boolean addWorker(Runnable firstTask, boolean core)
ThreadPoolExecutor#addWorker
方法提供了两个参数:
Runnable firstTask
,要执行的任务boolean core
,表示是否为核心线程
接着是ThreadPoolExecutor#addWorker
方法源码的第一部分,检查线程池的状态及线程的数量:
retry:
for (int c = ctl.get();;) {
// 检查线程池状态
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
return false;
for (;;) {
// 检查线程池的工作线程数量
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 增加线程池工作线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
以上这部分代码是对线程池状态的检查,以及处理主控状态 CTL,我们来分析关建代码:
- 第 4 行,检查线程池“最多”处于 SHUTDOWN 状态,因为只有在 SHUTDOWN 状态和 RUNNING 状态中,才会创建线程执行任务;
- 第 8 行,根据入参
boolean core
决定创建线程数量的上限是小于 corePoolSize 还是小于 maxmunPoolSize; - 第 11 行,调用
ThreadPoolExecutor#compareAndIncrementWorkerCount
方法来增加 CTL 中保存的工作线程数量,成功后则跳出 retry 标签; - 第 13 行,如果程序运行到这里,修改 CTL 中工作线程数量失败,需要重新获取 CTL 并检查线程池状态。
注意ThreadPoolExecutor#compareAndIncrementWorkerCount
方法采用了 CAS 技术,方法返回失败说明此时 CTL 已经被其它线程修改,需要重新获取 CTL 并检查线程池状态。
最后来看ThreadPoolExecutor#addWorker
方法的第二部分源码,执行工作任务:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker对象
w = new Worker(firstTask);
// 获取到Worker中创建的线程
final Thread t = w.thread;
if (t != null) {
// 使用ReentrantLock加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 检查线程池状态,只有RUNNING状态和STOP之下的状态允许创建线程
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
// 检查线程状态
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
// 记录线程池中出现的最大线程数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,开始执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
以上这部分代码是创建线程并执行任务的方法,我们来分析关键部分:
- 第 6 行,创建 Worker 对象,我们可以直接将 Worker 对象与 Thread 对象画上等号,认为创建了 Worker 对象就是创建了线程。
- 第 16 行,依旧是对线程池状态的检查,这里验证了线程池状态“最多”处于 SHUTDOWN 状态,这个限制也不难理解,RUNNGING 状态是线程池的正常状态,允许创建线程并处理任务,SHUTDOWN 状态下允许处理工作队列中的线程,如果线程池中没有活跃的线程,需要创建线程来处理。
需要注意,这部分代码中出现的ThreadPoolExecutor#runStateLessThan
方法与之前出现的ThreadPoolExecutor#runStateAtLeast
方法在比较 CTL 与状态时在开闭区间上存在差异。
到这里整个ThreadPoolExecutor#addWorker
的方法也就差不多分析完了,它实际上就做了两件事:
- 检查线程池,包括状态检查,线程池中线程数量的检查;
- 创建 Worker 对象,并启动(相当于创建 Thread 对象并启动)。
回过头来,我们结合对ThreadPoolExecutor#addWorker
方法的分析,不难看出ThreadPoolExecutor#execute
方法的 3 步都做了什么:
步骤 | 线程池状态 | 工作线程数 | 队列状态 | 处理方式 |
---|---|---|---|---|
STEP 1 | RUNNING | < corePoolSize | 空 | 核心线程数未满,创建核心线程处理任务 |
STEP2 | RUNNING | = corePoolSize | 未饱和 | 核心线程数已满,将任务添加到队列中 |
STEP3 | RUNNING | >= corePoolSize | 已饱和 | 核心线程和工作队列已满,创建非核心线程处理任务,创建失败则执行拒绝策略 |
至此,我们已经能够清晰的看到线程池的执行流程了,并且也可以印证我们通过演示代码来推测的线程池执行流程的正确性了。
线程池中的线程是什么时间创建的?
:::info
难易程度:🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
在上一题的分析中,我们已经知道线程池的线程是在提交任务后通过ThreadPoolExecutor#addWorker
方法中创建的,具体是在 Worker 的构造方法中:
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
每个 Worker 对象中都持有一个通过 ThreadFactory 创建的线程,这也是为什么我将线程池中的线程与 Worker 对象画上等号。
除此之外,还可以调用ThreadPoolExecutor#prestartCoreThread
来预创建线程,该方法源码如下:
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
}
从源码中可以看到,每次调用只会预创建 1 个 Worker 对象(即 1 个核心线程),如果需要创建全部的核心线程,需要多次调用。
🔥线程池中的核心线程是如何复用的?
:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:阿里巴巴,美团,蚂蚁金服
:::
线程池是通过 Worker 对象来完成线程的复用的。我们来看内部类 Worker 的类型声明:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable;
Worker 自身继承了 AQS,并实现了 Runnable 接口,说明 Worker 自身就是可被线程执行的。
接下来是 Worker 的构造方法:
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到,构造在创建线程时使用的 Runnable 对象是 Worker 对象自身,同时 Worker 对象通过成员变量 firstTask 保存了我们提交的 Runnable 对象。
回到ThreadPoolExecutor#addWorker
方法中,我们来看线程启动的部分:
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
我们已经知道,Java 中调用Thread#start
方法中,会执行Runnable#run
方法,那么在这段代码中执行的是谁的 run 方法呢?答案是 Worker 对象重写的 run 方法。
我们来看Worker#run
的源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
public void run() {
runWorker(this);
}
}
Worker#run
方法调用了 runWorker 方法,该方法并不是内部类 Worker 的,而是 ThreadPoolExecutor 的方法,接着来看源码:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取Worker对象中封装的Runnable
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 线程池状态检查
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
// 执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
重点关注第 9 行的 while 循环,它有两个条件:
task != null
,即 Worker 对象自身保存的 Runnable 不为 null;(task = getTask()) != null
,即通过ThreadPoolExecutor#getTask
方法获取到的 Runnable 不为 null。
第一个条件不难想到,即首次提交时 Worker 会“携带” Runnable,那么第二个条件是从哪里获取到的待执行任务呢?答案是工作队列。
我们来看ThreadPoolExecutor#getTask
方法的源码:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
// 状态检查
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取工作线程数量,并判断是否为非核心线程
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 通过工作队列获取待执行任务
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
先来关注第 12 行中变量 timed 的赋值逻辑,有两个条件:
boolean allowCoreThreadTimeOut
,ThreadPoolExecutor 的成员变量,可以通过ThreadPoolExecutor#allowCoreThreadTimeOut
方法赋值,表示是否允许核心线程超时销毁,默认值是 false;wc > corePoolSize
,即工作线程数量是否超出核心线程的数量限制。
timed 变量关系到第 20 行代码中获取工作队列中待执行任务的方式:
- 如果允许销毁核心线程,或工作线程数量已经超出核心线程数量限制,则通过
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
的方式获取待执行任务,该方法获取队首元素,如果当前队列为空,则在等待指定时间后返回 null; - 如果不允许销毁核心线程,或工作线程数量小于核心线程数量,则通过
workQueue.take()
的方式获取待执行任务,该方法获取队首元素,如果队列为空,则进入等待,直到能够获取元素为止。
知道了以上内容后,我们再来看ThreadPoolExecutor#runWorker
方法的 while 循环,在不允许销毁核心线程的线程池中,核心线程第一次执行 Worker 自身“携带”的任务,执行完毕后再次进入 while 循环,尝试获取工作队列中的待执行任务,如果此时工作队列中没有待执行任务,则工作队列进入阻塞,此时 runWorker 方法也进入阻塞,直到工作队列能够成功返回待执行任务。
简单来说,线程池的核心线程复用,在于核心线程启动后就进入“不眠不休”的工作中,除了执行首次提交的任务外,还会不断尝试从工作队列中获取待执行任务,如果无法获取就阻塞到能够获取任务为止。
线程池的非核心线程是什么时候销毁的?
:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:无
:::
非核心线程的销毁依旧是在ThreadPoolExecutor#runWorker
方法中进行的。
我们回到ThreadPoolExecutor#getTask
方法中:
private Runnable getTask() {
// 标记是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
假设现在的情况是非核心线程通过ThreadPoolExecutor#getTask
方法获取工作队列中的待执行任务,而此时工作队列中已经没有待执行的任务了。
第一次进入循环时,第 11 行代码会因为wc> corePoolSize
而将 timed 赋值为 true,此时wc == maximumPoolSize
且 timedOut 为 false,并不满足第 12 行的判断条件,会直接来到第 18 行通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
的方式从工作队列中获取待执行任务,在等待一定的时间后,工作队列返回了 null,此时进入第 21 行,将 timedOut 赋值为 true 后再次进入循环。
第二次进入循环时,变量 timed 依旧被赋值为 true,与第一次进入循环不同的是,此时的 timedOut 为 ture,已经满足第 12 行的判断条件,执行 if 语句的内容,调用ThreadPoolExecutor#compareAndDecrementWorkerCount
方法减少 CTL 中工作线程的数量,并且返回 null。
重点在这个返回的 null 上,当ThreadPoolExecutor#getTask
返回 null 后,ThreadPoolExecutor#runWorker
会跳出循环,执行 finally 中的ThreadPoolExecutor#processWorkerExit
方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers中删除worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试修改线程池状态
tryTerminate();
// 处理STOP之下的状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
该方法中,从 ThreadPoolExecutor 的成员变量 workers 中删除 Worker 对象后,其内部的线程也将执行完毕,随后会调用Thread#exit
方法来销毁线程,即表示线程池中该线程已经销毁。
ThreadPoolExecutor#submit 方法和 ThreadPoolExecutor#execute 方法有什么区别?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
ThreadPoolExecutor 的实现中,ThreadPoolExecutor#submit
和ThreadPoolExecutor#execute
有 4 处区别:
ThreadPoolExecutor#submit | ThreadPoolExecutor#execute | |
---|---|---|
方法定义 | ExecutorService 接口中定义 | Executor 接口中定义 |
返回值 | 有返回值 | 无返回值 |
任务类型 | Runnable 任务和 Callable 任务 | Runnable 任务 |
实现 | AbstractExecutorService 中实现 | ThreadPoolExecutor 中实现 |
Java 中提供了哪些拒绝策略?
:::info
难易程度:🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
Java 中提供了 4 种拒绝策略:
- CallerRunsPolicy:由调用
ThreadPoolExecutor#execute
方法的线程执行该任务; - AbortPolicy:直接抛出异常;
- DiscardPolicy:丢弃当前任务;
- DiscardOldestPolicy:丢弃工作队列中队首的任务,即最早加入队列中的任务,并将当前任务添加到队列中。
这 4 种拒绝策略被定义为 ThreadPoolExecutor 的内部类,源码如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
如果 Java 内置的拒绝策略无法满足业务需求,可以自定义拒绝策略,只需要实现 RejectedExecutionHandler 接口即可。
🔥什么是阻塞队列?
:::info
难易程度:🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:蚂蚁金服
:::
阻塞队列是一种特殊的队列,相较于普通的队列,阻塞队列提供了一个额外的功能,当队列为空时,获取队列中元素的线程会被阻塞。
Java 中提供了 7 种阻塞队列,全部是继承自 BlockingQueue 接口:
BlockingQueue 接口继承了 Queue 接口,而 Queue 接口继承了 Collection 接口。使用时,如果需要阻塞队列的能力,要选择单独定义在 BlockingQueue 接口中的方法。
以下是阻塞队列在实现不同接口方法时的差异:
操作 | 方法 | 特点 | |
---|---|---|---|
Collection 接口 | 添加元素 | boolean add(E e) | 失败时抛出异常 |
删除元素 | boolean remove(Object o) | 失败时抛出异常 | |
Queue 接口 | 入队操作 | boolean offer(E e) | 成功返回 true,失败返回 false |
出队操作 | E poll() | 成功返回出队元素,失败返回 null | |
BlockingQueue 接口 | 入队操作 | void put(E e) | 无法入队时阻塞当前线程,直到入队成功 |
boolean offer(E e, long timeout, TimeUnit unit) | 无法入队时阻塞当前线程,直到入队成功或超时 | ||
出队操作 | E take() | 队列中无元素时阻塞当前线程,直到有元素可出队 | |
E poll(long timeout, TimeUnit unit) | 队列中无元素时阻塞当前线程,直到有元素可出队或超时 |
接下来我们来看这 7 种阻塞队列的特点。
ArrayBlockingQueue
ArrayBlockingQueue 底层使用数组作为存储结构,需要指定阻塞队列的容量(数组的特点),且不具备扩容能力。除此之外,ArrayBlockingQueue 还提供了公平访问和非公平访问两种模式,默认使用非公平访问模式。
// 非公平访问的ArrayBlockingQueue
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(1000);
// 公平访问模式的ArrayBlockingQueue
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(1000, true);
公平访问模式指的是,在队列中无元素时阻塞的线程,会在队列可用时按照阻塞的先后顺序访问队列;而非公平访问模式下,则是随机一个线程获取访问权限。公平访问模式保证了先后顺序,但是为了维护这个先后顺序,需要付出额外的性能作为代价。
需要额外注意的是,ArrayBlockingQueue 在入队和出队操作上使用同一把锁,也就是说一个线程的入队操作,不仅会阻塞其它线程的入队操作,还会阻塞其它线程的出队操作。
LinkedBlockingQueue
LinkedBlockingQueue 底层使用单向链表作为存储结构,并且提供了默认的容量**Integer.MAX_VALUE**
,即在不指定容量时 LinkedBlockingQueue 是“无限大”的,也即是常说的无界队列。
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
LinkedBlockingQueue 内部使用两把独立的锁来控制入队和出队,这意味着入队和出队操作是相互独立的,并不会互相阻塞。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
}
PriorityBlockingQueue
PriorityBlockingQueue 底层使用数组作为存储结构,默认容量为 11,支持自动扩容,支持元素按照优先级排序,可以通过自定义实现 Comparator 接口来实现排序功能。
例如,创建 Student 类,并允许根据 studenId 进行排序:
@Getter
@Setter
public class Student implements Comparable<Student> {
private Integer studentId;
private String name;
@Override
public int compareTo(Student student) {
return this.studentId.compareTo(student.studentId);
}
public static void main(String[] args) {
PriorityBlockingQueue<Student> priorityBlockingQueue = new PriorityBlockingQueue<>();
for(int i = 0; i < 10; i++) {
Random random = new Random();
Student student = new Student();
Integer studentId = random.nextInt(1000);
student.setStudentId(studentId);
student.setName("name-"+ studentId);
priorityBlockingQueue.put(student);
}
}
}
注意,PriorityBlockingQueue 在入队后并不能完全保证队列中元素的优先级顺序,即队列中存储的元素可能是乱序的,但在出队时是按照优先级顺序出队。
DelayQueue
DelayQueue 底层使用 PriorityQueue 作为存储结构,因此 DelayQueue 具有 PriorityQueue 的特点,比如默认容量为 11,支持自动扩容,支持元素按照优先级排序。DelayQueue 自身的特点是延时获取元素,即在元素创建指定时间后才能够从队列中获取元素,为了实现这个功能,入队的元素必须实现 Delayed 接口。
我们来创建一个实现 Delayed 接口的元素:
public class DelayedElement implements Delayed {
private final Long createTime;
private final Long delayTIme;
private final TimeUnit delayTimeUnit;
public DelayedElement(long delayTime, TimeUnit delayTimeUnit) {
this.createTime = System.currentTimeMillis();
this.delayTIme = delayTime;
this.delayTimeUnit = delayTimeUnit;
}
@Override
public long getDelay(TimeUnit unit) {
long duration = createTime + TimeUnit.MILLISECONDS.convert(this.delayTIme, this.delayTimeUnit) - System.currentTimeMillis();
return unit.convert(duration, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
DelayedElement delayedElement = (DelayedElement) o;
return this.createTime.compareTo(delayedElement.createTime);
}
注意Delayed#getDelay
方法的实现,这里的返回值并不是固定的延时时间,而是期望的延时时间与当前时间的差值,只有差值小于等于 0 时该元素才能出队。这里我选择在构造方法中传入延时时间与时间单位,并记录对象的创建时间,Delayed#getDelay
方法通过 (创建时间+延时时间-当前时间)的方式计算差值。至于Comparable#compareTo
方法的实现,与 DelayQueue 的使用与其它阻塞队列并无差别。
SynchronousQueue
SynchronousQueue 不存储元素,它的每次入队操作都要对应一次出队操作,否则不能继续添加元素。
注意,SynchronousQueue 在入队成功后会阻塞线程,直到其他线程执行出队操作,同样的,出队操作时如果没有提前执行入队操作也会被阻塞,也就是说无法在一个线程中完成 SynchronousQueue 的入队操作和出队操作。
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
synchronousQueue.put(i);
System.out.println("put-" + i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println("take-" + synchronousQueue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
LinkedTransferQueue
LinkedTransferQueue 的底层使用单向链表作为存储结构,相较于其他阻塞队列 LinkedTransferQueue 还实现了 TransferQueue 接口,实现了类似于 SynchronousQueue 传递元素的功能,但与 SynchronousQueue 不同的是,LinkedTransferQueue 是能够存储元素的。
LinkedTransferQueue 实现了 TransferQueue 接口的 3 个方法:
public interface TransferQueue<E> extends BlockingQueue<E> {
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedExceptio)throws InterruptedException;
}
以上 3 个方法,在消费者等待获取元素(调用 take 方法或 poll 方法)时,会立刻将元素传递给消费者,它们 3 者的差异在于没有消费者等待获取元素时的处理方式:
- transfer 方法,该方法会阻塞线程,直到传入的元素被消费;
- tryTransfer 方法,该方法会直接返回 false,并且放弃该元素(即不会存储到队列中);
- 带有超时时间的 tryTransfer 方法,等待指定的时间,如果依旧没有消费者消费该元素,返回 false。
我们写个例子来测试LinkedTransferQueue#tryTransfer
方法:
LinkedTransferQueue<Integer> linkedTransferQueue = new LinkedTransferQueue<>();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
if (i == 3) {
linkedTransferQueue.tryTransfer(i);
} else {
linkedTransferQueue.put(i);
}
}
}).start();
TimeUnit.SECONDS.sleep(2);
for (int i = 0; i < 10; i++) {
System.out.println("take-" + linkedTransferQueue.take());
}
启动线程向 linkedTransferQueue 中添加元素,第 4 个元素调用LinkedTransferQueue#tryTransfer
来传递元素,主线程等待两秒后从 linkedTransferQueue 中取出元素,可以看到队列中并未存储第 4 个元素。
另外,如果 LinkedTransferQueue 中已经存在元素,take 方法或者 poll 方法优先获取的是队首元素,而不是通过LinkedTransferQueue#transfe
或LinkedTransferQueue#tryTransfe
传入的元素。
LinkedBlockingDeque
LinkedBlockingDeque 底层使用双向链表作为存储结构,与 LinkedBlockingQueue 相同,LinkedBlockingDeque 的默认容量为**Integer.MAX_VALUE**
,不同的是 LinkedBlockingDeque 实现了 BlockingDeque 接口,允许操作队首和队尾的元素。
LinkedBlockingDeque 的方法中,名称中含有 first 或 last 的公有方法均实现自 BlockingDeque 接口或 Deque 接口,例如:
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {
public void addFirst(E e) {}
public void addLast(E e) {}
public boolean offerFirst(E e) {}
public boolean offerLast(E e) {}
public void putFirst(E e) throws InterruptedException{}
public void putLast(E e) throws InterruptedException {}
public E pollFirst() {}
public E pollLast() {}
public E takeFirst() throws InterruptedException {}
public E takeLast() throws InterruptedException {}
}
什么是无界队列?使用无界队列会出现什么问题?
:::info
难易程度:🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥
:::
:::success
面试公司:无
:::
无界队列指的是队列的容量是否为“无限大”,当然这并不是真正意义上无限大,而是指一个非常大的值。例如,创建 LinkedBlockingQueue 时,如果使用无参构造器,LinkedBlockingQueue 的容量会被设置为Integer.MAX_VALUE
,那么它就是无界队列。
LinkedBlockingQueue 构造方法源码如下:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.capacity = capacity;
last = head = new Node<E>(null);
}
}
Integer.MAX_VALUE
的值约为 21 亿,当程序异常时或并发较大的情况下,可能会无限制的向阻塞队列中添加任务,导致内存溢出。通常在设置线程池的工作队列时,需要根据具体的需求来计算出较为合适的队列容量。
另外,在 Java 的内置线程池中,部分线程池使用了 LinkedBlockingQueue 且未指定容量,如:Executors#newFixedThreadPool
和 Executors#newSingleThreadExecutor
中创建的线程池使用了未指定容量的 LinkedBlockingQueue,这两个线程池中的队列即为无界队列。这也是为什么阿里巴巴在《Java 开发手册》中提示到,不要使用 Java 内置线程池的一个原因。
🔥如何合理的设置线程池的参数?
:::info
难易程度:🔥🔥🔥🔥🔥
:::
:::warning
重要程度:🔥🔥🔥🔥🔥
:::
:::success
面试公司:阿里巴巴,美团,蚂蚁金服
:::
Java 提供了 7 个参数来实现自定义线程池:
int corePoolSize
:核心线程数量int maximumPoolSize
:最大线程数量long keepAliveTime
:非核心线程存活时间TimeUnit unit
:非核心线程存活时间的单位BlockingQueue<Runnable> workQueue
:阻塞队列ThreadFactory threadFactory
:线程工厂RejectedExecutionHandler handler
:拒绝策略
虽然参数众多,但我们关注的重点在 corePoolSize 和 maximumPoolSize 这两个参数上。
通常在 corePoolSize 的设置上,会将程序区分为 IO 密集型和 CPU 密集型进行 corePoolSize 的设置:
- CPU 密集型: C P U 核心数 + 1 CPU核心数+1 CPU核心数+1
- IO 密集型: C P U 核心数 × 2 CPU核心数\times 2 CPU核心数×2
在 IO 密集型程序的 corePoolSize 设置上,还有另一种方案:
(
线程等待时间
线程执行时间
+
1
)
×
C
P
U
核心数
(\frac{线程等待时间}{线程执行时间} + 1)\times CPU核心数
(线程执行时间线程等待时间+1)×CPU核心数。
例如,假设每个线程的执行时间为 1 秒,而线程等待获取 CPU 的时间为 2 秒,CPU 核心数为 16,那么根据以上公式可以得到:
(
2
1
+
1
)
×
16
=
48
(\frac{2}{1}+1)\times16=48
(12+1)×16=48,即将 corePoolSize 设置为 48.
IO 密集型程序允许设置较大的 corePoolSize 的原因是,CPU 可能有大量时间都在等待 IO 操作而处于空闲状态,设置较大的 corePoolSize 可以提升 CPU 的利用率,但这么做的另一个问题是,IO 操作的速度是远低于 CPU 的计算速度的, 程序的性能瓶颈会暴露在“低效”的 IO 操作上。
除了以上两种方案外,美团在Java线程池实现原理及其在美团业务中的实践中也提到了一些 corePoolSize 和 maximumPoolSize 的设置方案:
那么以上的计算方案哪个才是线程池设置的“银剑”呢?实际上,以上的方案都只能应对一些特定的场景,而程序往往是复杂多变,且不可预估的,没有哪一种理论方案可以应对所有的场景。
在设置线程池时,会根据以上理论公式预估出较为合理的初始设置,随后通过压测来调整线程池在极端场景的合理设置。当然,程序不会一直处于这种极端场景,如果有能力实现线程池的监控,可以根据实时情况调整线程池,保证程序运行的稳定性。
ThreadPoolExecutor 提供了一些方法,可以用于监控并动态调整线程池:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 设置线程工厂
public void setThreadFactory(ThreadFactory threadFactory) {}
// 设置聚聚策略
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {}
// 设置核心线程数
public void setCorePoolSize(int corePoolSize) {}
// 设置最大线程数
public void setMaximumPoolSize(int maximumPoolSize) {}
// 设置非核心线程存活时间
public void setKeepAliveTime(long time, TimeUnit unit) {}
// 获取正在执行任务的大致线程数量
public int getActiveCount() {}
// 获取线程池中曾经出现过的最大的活跃线程数
public int getLargestPoolSize() {}
// 获取已经完成执行和待执行的大致任务数量
public long getTaskCount() {}
// 获取已经完成执行的大致任务数量
public long getCompletedTaskCount() {}
}
借助以上方法,并添加对执行线程的监控,可以完成线程池的动态调整,以达到线程池在任何场景下都处于最佳设置中。
参考资料
- 一文彻底了解线程池
- 编程技巧:“高端”的位运算
- Java线程池实现原理及其在美团业务中的实践
如果本文对你有帮助的话,还请多多点赞支持。如果文章中出现任何错误,还请批评指正。最后欢迎大家关注分享硬核Java技术的金融摸鱼侠王有志,我们下次再见!