说在前面
在40岁老架构师 尼恩的读者社区(50+)中,最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易的面试资格,遇到了几个很重要的面试题:
如何设计线程池?
与之类似的、其他小伙伴遇到过的问题还有:
请手写一个简单线程池?
这个问题,很多小伙伴在社群里边反馈,都遇到过
线程池的知识,既是面试的核心知识,又是开发的核心知识。
所以,这里尼恩给大家做一下系统化、体系化的线程池梳理,使得大家可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”。
也一并把这个题目以及参考答案,收入咱们的 《尼恩Java面试宝典》V62版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。
注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请从公众号 【技术自由圈】获取。
为什么要使用线程池?
多线程编程是在开发过程中非常基础且非常重要的一个环节,基本上任何一家软件公司或者项目中都会使用多线程。主要有三个原因:
- 降低资源的消耗。降低线程创建和销毁的资源消耗。
- 提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
- 提高线程的可管理性
总之:线程池是一种常用的并发编程工具,它可以帮助我们更好地管理和复用线程资源,提高程序的性能和稳定性。
线程池也是 3高架构的基础技术。
JAVA中的线程池组件
在Java中,我们可以使用java.util.concurrent包中提供的ThreadPoolExecutor类来创建和使用线程池。
ThreadPoolExecutor 是非常高频,非常常用的组件。
对于 ThreadPoolExecutor 的底层原理和源码,大家要做到非常深入的掌握。大家一定要深入看ThreadPoolExecutor线程池源码,了解其执行过程。
另外,看懂线程池执行流程和源码设计有助于提升我们多线程编程技术和解决工作中遇到的问题。
手写线程池的重要性
很多小伙伴给尼恩反馈, 说线程池的源码太难,看不懂。
怎么办呢?
大家可以先易后难。
可以手撸一个简单版的线程池加强一下对执行流程的理解。然后再深入源码去历险记。
或者说,如果我们想要更好地理解线程池的工作原理,那么自己动手实现一个简单的线程池是一个很好的选择。
接下来,我将带领大家一步一步地实现一个简单的线程池。
我们将从最基本的功能开始,逐步添加更多的功能和优化,最终实现一个完整的线程池。
线程池的实现原理
线程池是一个典型的生产者-消费者模型。下图所示为线程池的实现原理:
- 调用方不断向线程池中提交任务; (生产者)
- 线程池中有一组线程,不断地从队列中取任务,(消费者)
- 线程池管理一个任务队列,对 异步任务进行缓冲 (缓冲区)
要实现一个线程池,有几个问题需要考虑:
- 队列设置多长?
如果是无界的,调用方不断往队列中方任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理? - 线程池中的线程个数是固定的,还是动态变化的?
- 每次提交新任务,是放入队列?还是开新线程
- 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
针对问题4,有3种做法:
- 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。
当队列为空时,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。 - 不使用阻塞队列,但在队列外部,线程池内部实现了阻塞/唤醒机制
- 使用阻塞队列
很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来的资源消耗和延迟。
现在来带大家手写一个简单的线程池,让大家更加理解线程池的工作原理
手写一个简单线程池
第一步:定义线程池接口
首先,我们需要定义一个线程池接口,用来表示线程池应该具备哪些功能。
一个简单的线程池应该至少具备以下几个功能:
- 添加任务并执行
- 关闭线程池
- 强制关闭线程池
因此,我们可以定义一个ThreadPool接口,它包含三个方法:execute、shutdown和shutdownNow。
import java.util.List;
// 线程池接口
public interface ThreadPool {
// 提交任务到线程池
void execute(Runnable task);
// 优雅关闭
void shutdown();
//立即关闭
List<Runnable> shutdownNow();
}
其中:
- execute方法用来添加任务并执行;
- shutdown方法用来关闭线程池,它会等待已经提交到线程池中的任务都执行完毕后再关闭;
- shutdownNow方法用来强制关闭线程池,它会立即停止所有正在执行或等待执行的任务,并返回未执行的任务列表。
第二步:实现线程的池化管理
接下来,我们需要实现一个简单的线程池类,它实现了ThreadPool接口,并提供了基本的功能。
为了简化代码,先实现一个v1版本,这是一个基础版本,一个简单的实现示例。
在v1版本中,我们先不考虑拒绝策略、自动调节线程资源等高级功能。
下面是一个简单的实现示例:
首先定义一个工作线程类:
// 定义一个工作线程类
public class WorkerThread extends Thread {
// 用于从任务队列中取出并执行任务
private BlockingQueue<Runnable> taskQueue;
// 构造方法,传入任务队列
public WorkerThread(BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
// 重写run方法
@Override
public void run() {
// 循环执行,直到线程被中断
while (!Thread.currentThread().isInterrupted() && !taskQueue.isEmpty()) {
try {
// 从任务队列中取出一个任务,如果队列为空,则阻塞等待
Runnable task = taskQueue.take();
// 执行任务
task.run();
} catch (Exception e) {
e.printStackTrace();
// 如果线程被中断,则退出循环
break;
}
}
}
}
然后, 基于一个线程池接口,实现一个简单的线程池,
// 简单的线程池实现
public class SimpleThreadPool implements ThreadPool {
// 线程池初始化时的线程数量
private int initialSize;
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 用于存放和管理工作线程的集合
private List<WorkerThread> threads;
// 是否已经被shutdown标志
private volatile boolean isShutdown = false;
public SimpleThreadPool(int initialSize) {
this.initialSize = initialSize;
taskQueue = new LinkedBlockingQueue<>();
threads = new ArrayList<>(initialSize);
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread(taskQueue);
workerThread.start();
threads.add(workerThread);
}
}
// 实现execute方法,用于将任务加入到任务队列,并通知工作线程来执行
@Override
public void execute(Runnable task) {
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown");
}
taskQueue.offer(task);
}
// 关闭线程池, 等待所有线程执行完毕
@Override
public void shutdown() {
// 修改状态
isShutdown = true;
for (WorkerThread thread : threads) {
// 中断线程
thread.interrupt();
}
}
@Override
public List<Runnable> shutdownNow() {
// 修改状态
isShutdown = true;
// 清空队列
List<Runnable> remainingTasks = new ArrayList<>();
taskQueue.drainTo(remainingTasks);
// 中断所有线程
for (WorkerThread thread : threads) {
thread.interrupt();
}
// 返回未执行任务集合
return remainingTasks;
}
}
这个版本的线程池实现了基本的添加任务并执行、关闭线程池和强制关闭线程池等功能。
它在构造方法中接收一个初始化线程池大小参数,用于初始化任务队列和工作线程集合,并创建一定数量的工作线程。
第三步:自定义线程池的基本参数
在上一步中,我们实现了一个简单的线程池,它具备了基本的功能。
但是,它存在一个问题:任务队列没有指定容量大小,是个无界队列,其次只指定了初始的线程池大小,应该要提供根据不同的应用场景来调整线程池的大小参数,以提高性能和资源利用率。
因此线程池实现类需要实现自定义初始大小、最大大小以及核心大小的功能。
- 初始大小是指线程池初始化时创建的工作线程数量
- 最大大小是指线程池能够容纳的最多的工作线程数量
- 核心大小是指线程池在没有任务时保持存活的工作线程数量。
这三个参数需要在基本的线程池实现类中定义为成员变量,并在构造方法中传入并赋值。
同时,还需要在execute方法中根据这三个参数来动态地调整工作线程的数量,例如:
- 当活跃的工作线程数量小于核心大小时,尝试创建并启动一个新的工作线程来执行任务;
- 当活跃的工作线程数量大于等于核心大小时,将任务加入到任务队列,等待空闲的工作线程来执行;
- 当任务队列已满时,尝试创建并启动一个新的工作线程来执行任务,
- 当活跃的工作线程数量达到最大大小时,无法再创建新的工作线程。
我们还需要在构造方法里提供一个参数queueSize,用于限制队列大小。
下面我们就对类进行改造:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class SimpleThreadPool implements ThreadPool {
// 线程池初始化时的线程数量
private int initialSize;
// 线程池最大线程数量
private int maxSize;
// 线程池核心线程数量
private int coreSize;
// 队列大小
private int queueSize;
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 用于存放和管理工作线程的集合
private List<WorkerThread> threads;
// 是否已经被shutdown标志
private volatile boolean isShutdown = false;
public SimpleThreadPool(int initialSize, int maxSize, int coreSiz, int queueSizee) {
// 初始化参数
this.initialSize = initialSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
threads = new ArrayList<>(initialSize);
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start(taskQueue);
threads.add(workerThread);
}
}
@Override
public void execute(Runnable task) {
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown");
}
// 当线程数量小于核心线程数时,创建新的线程
if (threads.size() < coreSize) {
addWorkerThread(task);
} else if (!taskQueue.offer(task)) {
// 当队列已满时,且线程数量小于最大线程数量时,创建新的线程
if (threads.size() < maxSize) {
addWorkerThread(task);
} else {
throw new IllegalStateException("执行任务失败");
}
}
}
// 创建新的线程,并执行任务
private void addWorkerThread(Runnable task) {
WorkerThread workerThread = new WorkerThread();
workerThread.start(taskQueue);
threads.add(workerThread);
// 任务放入队列
taskQueue.offer(task);
}
省略其它代码
}
这一步,我们在 SimpleThreadPool里新增了initialSize,maxSize, coreSize 三个变量,在构造方法里传入对应三个参数,同时在execute方法里,当有任务进入时,先判断当前线程池数量是否满足不同条件,进而执行不同的处理逻辑。
第四步:设计饱和拒绝策略
这个功能是为了处理当任务队列已满且无法再创建新的工作线程时,也是就线程池的工作量饱和时,如何处理被拒绝的任务。
不同的场景可能需要不同的拒绝策略,例如
- 直接抛出异常
- 忽略任务
- 阻塞当前线程
- 等等
为了让用户可以自定义拒绝策略,需要
- 定义一个拒绝策略接口,声明一个方法,用于处理被拒绝的任务。
- 然后需要在基本的线程池实现类中定义一个拒绝策略成员变量,并在构造方法中传入并赋值。
- 最后,在execute方法中,在无法创建新的工作线程时,调用拒绝策略来处理该任务。
下面是一个简单的实现示例,
我们首先定义了一个RejectedExecutionHandler接口,用来表示拒绝策略。用户可以根据需要实现这个接口,并在构造线程池时传入自己的拒绝策略。
public interface RejectedExecutionHandler {
// 参数:r 代表被拒绝的任务,executor 代表线程池对象
void rejectedExecution(Runnable r, ThreadPool executor);
}
我们再实现一个直接抛出异常的拒绝策略
// 直接抛出异常的拒绝策略
public class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPool executor) {
throw new RuntimeException("The thread pool is full and the task queue is full.");
}
}
我们也可以实现一个丢弃策略:
// 忽略任务的拒绝策略
public class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPool executor) {
// do nothing
System.out.println("Task rejected: " + r);
}
}
接下来,我们再优化SimpleThreadPool类,
public class SimpleThreadPool implements ThreadPool {
// 线程池初始化时的线程数量
private int initialSize;
// 线程池最大线程数量
private int maxSize;
// 线程池核心线程数量
private int coreSize;
// 队列大小
private int queueSize;
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 用于存放和管理工作线程的集合
private List<WorkerThread> threads;
// 是否已经被shutdown标志
private volatile boolean isShutdown = false;
// 默认的拒绝策略
private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER = new AbortPolicy();
// 拒绝策略成员变量
private final RejectedExecutionHandler rejectHandler;
public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize) {
this(initialSize, maxSize, coreSize, queueSize, DEFAULT_REJECT_HANDLER);
}
public SimpleThreadPool(int initialSize, int maxSize, int coreSize , int queueSize, RejectedExecutionHandler rejectHandler) {
System.out.printf("初始化线程池: initialSize: %d, maxSize: %d, coreSize: %d%n", initialSize, maxSize, coreSize);
// 初始化参数
this.initialSize = initialSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
threads = new ArrayList<>(initialSize);
this.rejectHandler = rejectHandler;
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread(taskQueue);
workerThread.start();
threads.add(workerThread);
}
}
@Override
public void execute(Runnable task) {
System.out.printf("添加任务: %s%n", task.toString());
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown");
}
// 当线程数量小于核心线程数时,创建新的线程
if (threads.size() < coreSize) {
addWorkerThread(task);
System.out.printf("创建新的线程: thread count: %d, number of queues: %d%n", threads.size(), taskQueue.size());
} else if (!taskQueue.offer(task)) {
// 当队列已满时,且线程数量小于最大线程数量时,创建新的线程
if (threads.size() < maxSize) {
addWorkerThread(task);
System.out.printf("创建新的线程: thread count: %d, number of queues: %d%n", threads.size(), taskQueue.size());
} else {
//使用拒绝策略
rejectHandler.rejectedExecution(task, this);
}
}
}
// 省略其它代码
}
这个版本的线程池在构造方法中新增了一个handler参数,用来表示拒绝策略。当任务队列已满时,它会调用handler的rejectedExecution方法来处理被拒绝的任务。
第五步:性能优化之:自动调节线程资源
到目前为止,我们已经实现了一个简单但功能完备的线程池。
但是,它还有很多可以优化和扩展的地方。
例如,可以添加自动调节线程资源的功能,为啥需要 自动调节 线程资源呢?
因为线程资源,是非常昂贵的。
自动调节 线程资源功能是为了让线程池可以根据任务的变化,动态地增加或减少工作线程的数量,以提高性能和资源利用率。
为了实现这个功能,需要在基本的线程池实现类中定义一个空闲时长成员变量,并在构造方法中传入并赋值。
空闲时长是指当工作线程没有任务执行时,可以保持存活的时间。
如果超过这个时间还没有新的任务,那么工作线程就会自动退出。
同时,还需要在工作线程类中定义一个空闲开始时间成员变量,并在run方法中更新它。
空闲开始时间是指当工作线程从任务队列中取出一个任务后,上一次取出任务的时间。
如果当前时间减去空闲开始时间大于空闲时长,那么工作线程就会自动退出。
ok,那么我们继续改进线程池,
public class SimpleThreadPool implements ThreadPool {
// 省略其它代码
// 线程空闲时长
private long keepAliveTime;
public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime) {
this(initialSize, maxSize, coreSize, queueSize, keepAliveTime, DEFAULT_REJECT_HANDLER);
}
public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime, RejectedExecutionHandler rejectHandler) {
System.out.printf("初始化线程池: initialSize: %d, maxSize: %d, coreSize: %d%n", initialSize, maxSize, coreSize);
// 初始化参数
this.initialSize = initialSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
threads = new ArrayList<>(initialSize);
this.rejectHandler = rejectHandler;
this.keepAliveTime = keepAliveTime;
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
// 传入相关参数到工作线程
WorkerThread workerThread = new WorkerThread(keepAliveTime, taskQueue, threads);
workerThread.start();
threads.add(workerThread);
}
}
// 省略其它代码
}
然后改造工作线程WorkerThread:
// 定义一个工作线程类
public class WorkerThread extends Thread {
private List<WorkerThread> threads;
// 空闲时长
private long keepAliveTime;
// 用于从任务队列中取出并执行任务
private BlockingQueue<Runnable> taskQueue;
// 构造方法,传入任务队列
public WorkerThread(long keepAliveTime, BlockingQueue<Runnable> taskQueue, List<WorkerThread> threads) {
this.keepAliveTime = keepAliveTime;
this.taskQueue = taskQueue;
this.threads = threads;
}
// 重写run方法
@Override
public void run() {
long lastActiveTime = System.currentTimeMillis();
// 循环执行,直到线程被中断
Runnable task;
while (!Thread.currentThread().isInterrupted() && !taskQueue.isEmpty()) {
try {
// 从任务队列中取出一个任务,如果队列为空,则阻塞等待
task = taskQueue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (task != null) {
task.run();
System.out.printf("WorkerThread %d, current task: %s%n", Thread.currentThread().getId(), task.toString());
lastActiveTime = System.currentTimeMillis();
} else if (System.currentTimeMillis() - lastActiveTime >= keepAliveTime) {
// 从线程池中移除
threads.remove(this);
System.out.printf("WorkerThread %d exit %n", Thread.currentThread().getId());
break;
}
} catch (Exception e) {
// 从线程池中移除
threads.remove(this);
e.printStackTrace();
// 如果线程被中断,则退出循环
break;
}
}
}
}
在WorkerThread类run方法里,采用taskQueue.poll
方法指定等待时长,这里是线程退出的关键。
如果超时未获取到任务,则表明当前线程长时间未处理任务,可以正常退出,并从线程池里移除该线程。
手写一个简单线程池的完整代码
下面,我们给出完整的所有代码:
拒绝策略相关类:
// 拒绝策略接口
public interface RejectedExecutionHandler {
// 参数:r 代表被拒绝的任务,executor 代表线程池对象
void rejectedExecution(Runnable r, ThreadPool executor);
}
// 忽略任务的拒绝策略
public class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPool executor) {
// do nothing
RunnableWrapper wrapper = (RunnableWrapper) r;
System.out.println("Task rejected: " + wrapper.getTaskId());
}
}
为了通过输出日志,清晰的展现线程池中任务的运行流程,新增了RunnableWrapper用于记录taskId,方便日志监控。
public class RunnableWrapper implements Runnable{
private final Integer taskId;
public RunnableWrapper(Integer taskId) {
this.taskId = taskId;
}
public Integer getTaskId() {
return this.taskId;
}
@Override
public void run() {
System.out.println("Task " + taskId + " is running.");
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
// ignore
}
System.out.println("Task " + taskId + " is completed.");
}
}
线程池接口
// 线程池接口
public interface ThreadPool {
// 提交任务到线程池
void execute(Runnable task);
// 优雅关闭
void shutdown();
//立即关闭
List<Runnable> shutdownNow();
}
工作线程类:
// 定义一个工作线程类
public class WorkerThread extends Thread {
private List<WorkerThread> threads;
// 空闲时长
private long keepAliveTime;
// 用于从任务队列中取出并执行任务
private BlockingQueue<Runnable> taskQueue;
// 构造方法,传入任务队列
public WorkerThread(long keepAliveTime, BlockingQueue<Runnable> taskQueue, List<WorkerThread> threads) {
this.keepAliveTime = keepAliveTime;
this.taskQueue = taskQueue;
this.threads = threads;
}
// 重写run方法
@Override
public void run() {
long lastActiveTime = System.currentTimeMillis();
// 循环执行,直到线程被中断
Runnable task;
while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {
try {
// 从任务队列中取出一个任务,如果队列为空,则阻塞等待
task = taskQueue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
RunnableWrapper wrapper = (RunnableWrapper) task;
if (task != null) {
System.out.printf("WorkerThread %s, poll current task: %s%n", Thread.currentThread().getName(), wrapper.getTaskId());
task.run();
lastActiveTime = System.currentTimeMillis();
} else if (System.currentTimeMillis() - lastActiveTime >= keepAliveTime) {
// 从线程池中移除
threads.remove(this);
System.out.printf("WorkerThread %s exit %n", Thread.currentThread().getName());
break;
}
} catch (Exception e) {
// 从线程池中移除
System.out.printf("WorkerThread %s occur exception%n", Thread.currentThread().getName());
threads.remove(this);
e.printStackTrace();
// 如果线程被中断,则退出循环
break;
}
}
}
}
简单线程池实现类
public class SimpleThreadPool implements ThreadPool {
// 线程池初始化时的线程数量
private int initialSize;
// 线程池最大线程数量
private int maxSize;
// 线程池核心线程数量
private int coreSize;
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 用于存放和管理工作线程的集合
private List<WorkerThread> threads;
// 是否已经被shutdown标志
private volatile boolean isShutdown = false;
// 默认的拒绝策略
private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER = new AbortPolicy();
// 拒绝策略成员变量
private final RejectedExecutionHandler rejectHandler;
// 线程空闲时长
private long keepAliveTime;
public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime) {
this(initialSize, maxSize, coreSize, queueSize, keepAliveTime, DEFAULT_REJECT_HANDLER);
}
public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime, RejectedExecutionHandler rejectHandler) {
System.out.printf("初始化线程池: initialSize: %d, maxSize: %d, coreSize: %d%n", initialSize, maxSize, coreSize);
// 初始化参数
this.initialSize = initialSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
threads = new ArrayList<>(initialSize);
this.rejectHandler = rejectHandler;
this.keepAliveTime = keepAliveTime;
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread(keepAliveTime, taskQueue, threads);
workerThread.start();
threads.add(workerThread);
}
}
@Override
public void execute(Runnable task) {
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown");
}
RunnableWrapper wrapper = (RunnableWrapper) task;
System.out.printf("put task: %s %n" , wrapper.getTaskId());
// 当线程数量小于核心线程数时,创建新的线程
if (threads.size() < coreSize) {
addWorkerThread(task);
System.out.printf("小于核心线程数,创建新的线程: thread count: %d, queue remainingCapacity : %d%n", threads.size(), taskQueue.remainingCapacity());
} else if (!taskQueue.offer(task)) {
// 当队列已满时,且线程数量小于最大线程数量时,创建新的线程
if (threads.size() < maxSize) {
addWorkerThread(task);
System.out.printf("队列已满, 创建新的线程: thread count: %d, queue remainingCapacity : %d%n", threads.size(), taskQueue.remainingCapacity());
} else {
rejectHandler.rejectedExecution(task, this);
}
} else {
System.out.printf("任务放入队列: thread count: %d, queue remainingCapacity : %d%n", threads.size(), taskQueue.remainingCapacity());
}
}
// 创建新的线程,并执行任务
private void addWorkerThread(Runnable task) {
WorkerThread workerThread = new WorkerThread(keepAliveTime, taskQueue, threads);
workerThread.start();
threads.add(workerThread);
// 任务放入队列
try {
taskQueue.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 关闭线程池, 等待所有线程执行完毕
@Override
public void shutdown() {
System.out.printf("shutdown thread, count: %d, queue remainingCapacity : %d%n", threads.size(), taskQueue.remainingCapacity());
// 修改状态
isShutdown = true;
for (WorkerThread thread : threads) {
// 中断线程
thread.interrupt();
}
}
@Override
public List<Runnable> shutdownNow() {
System.out.printf("shutdown thread now, count: %d, queue remainingCapacity : %d%n", threads.size(), taskQueue.remainingCapacity());
// 修改状态
isShutdown = true;
// 清空队列
List<Runnable> remainingTasks = new ArrayList<>();
taskQueue.drainTo(remainingTasks);
// 中断所有线程
for (WorkerThread thread : threads) {
thread.interrupt();
}
// 返回未执行任务集合
return remainingTasks;
}
}
第六步:验证线程池
接下来,我们编写一个测试用例来验证我们的线程池是否能正常运行。
最后,我们编写了一个测试用例SimpleThreadPoolTest,
// 定义一个测试用例类
public class SimpleThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
SimpleThreadPool threadPool = new SimpleThreadPool(1, 4, 2, 3, 2000, new DiscardPolicy());
for (int i = 0; i < 10; i++) {
threadPool.execute(new RunnableWrapper(i));
}
Thread.sleep(10_000);
threadPool.shutdown();
}
}
这个测试用例创建了一个拥有1个初始线程、最多4个线程、核心线程数为2、队列长度为3,空闲线程保留时间为2000毫秒的线程池。
它使用了一个简单的拒绝策略,当任务被拒绝时,它会打印一条消息。
然后,测试用例向线程池中提交了10个简单的任务,每个任务都会打印一条消息,然后睡眠100毫秒,再打印一条消息。
最后,测试用例调用了shutdown方法来关闭线程池。
当我们运行这个测试用例时,会看到类似下面的输出:
初始化线程池: initialSize: 1, maxSize: 4, coreSize: 2
put task: 0
小于核心线程数,创建新的线程: thread count: 2, queue remainingCapacity : 2
put task: 1
WorkerThread Thread-1, poll current task: 0
WorkerThread Thread-0, poll current task: 1
Task 1 is running.
任务放入队列: thread count: 2, queue remainingCapacity : 2
put task: 2
Task 0 is running.
任务放入队列: thread count: 2, queue remainingCapacity : 2
put task: 3
任务放入队列: thread count: 2, queue remainingCapacity : 1
put task: 4
任务放入队列: thread count: 2, queue remainingCapacity : 0
put task: 5
WorkerThread Thread-2, poll current task: 2
Task 2 is running.
队列已满, 创建新的线程: thread count: 3, queue remainingCapacity : 0
put task: 6
WorkerThread Thread-3, poll current task: 3
Task 3 is running.
队列已满, 创建新的线程: thread count: 4, queue remainingCapacity : 0
put task: 7
Task rejected: 7
put task: 8
Task rejected: 8
put task: 9
Task rejected: 9
Task 2 is completed.
Task 1 is completed.
WorkerThread Thread-2, poll current task: 4
Task 4 is running.
Task 0 is completed.
Task 3 is completed.
WorkerThread Thread-1, poll current task: 6
WorkerThread Thread-0, poll current task: 5
Task 6 is running.
Task 5 is running.
Task 5 is completed.
Task 6 is completed.
Task 4 is completed.
WorkerThread Thread-3 exit
WorkerThread Thread-1 exit
WorkerThread Thread-0 exit
WorkerThread Thread-2 exit
shutdown thread, count: 0, queue remainingCapacity : 3
从输出中可以看出,线程池中最多有4个线程在同时运行。当有空闲线程时,新提交的任务会被立即执行;否则,新提交的任务会被添加到任务队列中等待执行。
ok,到了这里,大家能够帮助大家更好地理解上文中实现的简单线程池。
接下来,大家可以去进一步技术深入,去研究线程池的源码了。
实操总结
通过实操,我们一步一步地实现了一个简单的线程池。
我们从最基本的功能开始,逐步添加了拒绝策略、自动调节线程资源等高级功能,并对线程池进行了优化。
通过这个过程,我们可以更好地理解线程池的工作原理和实现细节。
当然,这只是一个简单的示例,实际应用中的线程池可能会更加复杂和强大。
上的实操,与Java标准库中提供的线程池相比,仍然存在一些不足之处。
例如:
- 没有提供足够的构造参数和方法,让用户可以更好地控制和监控线程池的行为。
- 没有提供足够多的拒绝策略,让用户可以根据不同的场景选择不同的拒绝策略。
- 没有提供定时任务和周期性任务的执行功能。
- 没有提供足够完善的错误处理机制。
Java标准库中提供的线程池实现(如ThreadPoolExecutor类),则在这些方面都做得更好。
- 它提供了丰富的构造参数和方法,让用户可以更好地控制和监控线程池的行为;
- 它提供了多种拒绝策略,让用户可以根据不同的场景选择不同的拒绝策略;
- 它还提供了ScheduledThreadPoolExecutor类,用来执行定时任务和周期性任务;
- 它还提供了完善的错误处理机制,可以帮助用户更好地处理异常情况。
如果要深入了解Java标准库中提供的线程池实现,可以参考清华大学出版社所出版的尼恩 Java高并发三部曲之二 《Java高并发核心编程 卷2 加强版》。
此书广受好评,写得非常细致,这里不做赘述。
作者介绍:
本文1作: Andy,资深架构师, 《Java 高并发核心编程 加强版》作者之1 。
本文2作: 尼恩,40岁资深老架构师, 《Java 高并发核心编程 加强版 卷1、卷2、卷3》创世作者, 著名博主 。 《K8S学习圣经》《Docker学习圣经》等11个PDF 圣经的作者。
参考文献:
https://www.cnblogs.com/daoqidelv/p/7043696.html
清华大学出版社《Java高并发核心编程 卷2 加强版》
《队列之王 Disruptor 红宝书》
技术自由的实现路径 PDF:
实现你的 架构自由:
《吃透8图1模板,人人可以做架构》
《10Wqps评论中台,如何架构?B站是这么做的!!!》
《阿里二面:千万级、亿级数据,如何性能优化? 教科书级 答案来了》
《峰值21WQps、亿级DAU,小游戏《羊了个羊》是怎么架构的?》
《100亿级订单怎么调度,来一个大厂的极品方案》
《2个大厂 100亿级 超大流量 红包 架构方案》
… 更多架构文章,正在添加中
实现你的 响应式 自由:
《响应式圣经:10W字,实现Spring响应式编程自由》
这是老版本 《Flux、Mono、Reactor 实战(史上最全)》
实现你的 spring cloud 自由:
《Spring cloud Alibaba 学习圣经》
《分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)》
《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)》
实现你的 linux 自由:
《Linux命令大全:2W多字,一次实现Linux自由》
实现你的 网络 自由:
《TCP协议详解 (史上最全)》
《网络三张表:ARP表, MAC表, 路由表,实现你的网络自由!!》
实现你的 分布式锁 自由:
《Redis分布式锁(图解 - 秒懂 - 史上最全)》
《Zookeeper 分布式锁 - 图解 - 秒懂》
实现你的 王者组件 自由:
《队列之王: Disruptor 原理、架构、源码 一文穿透》
《缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)》
《缓存之王:Caffeine 的使用(史上最全)》
《Java Agent 探针、字节码增强 ByteBuddy(史上最全)》
实现你的 面试题 自由:
4000页《尼恩Java面试宝典 》 40个专题
以上尼恩 架构笔记、面试题 的PDF文件更新,请到《技术自由圈》公号获取↓↓↓