作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
私以为造轮子几乎是最好的学习方式,甚至没有之一。因为造轮子需要至少做足以下两点:
- 了解设计思想(设计层面)
- 大略看过源码(代码层面)
不了解设计,就无法把握整体。没看过代码,就无法完善细节。另外,从创作者的角度来说,直接分析源码有时太困难了,代码太多,抽象层次太深。如果可以通过造轮子,把抽象层次减少一些,采用平铺直叙的方式呈现,那么读者理解起来也就更容易些。
既然造轮子这么好,那就,拿来吧你。今天带大家造一个线程池。
设计思路
不能免俗,直接“图片来自百度”:
池化技术
大家多少听过所谓的“池化技术”,比如数据库连接池、常量池、线程池、对象池等等。池化技术是计算机世界里比较常用的、行之有效的优化手段。那么我想问你,线程池中的“池”,到底指代什么?
抛开无足轻重的小虾米,线程池中最主要的就两个:我们向Executor提交的任务、Executor自己维护的Thread。其中,线程“池”显然指代的是Thread的集合。
但和数据库连接池等一般的池化技术不同的是,ThreadPool的作用不单单是“池化”,它更重要的职责其实是“做功”,也即是执行任务。举个例子,平时我们使用数据库连接池,其实都是从池中取出一个Connection,执行完SQL后会调用重写过的close()归还Connection。但你可曾见过有人向ThreadPool讨要Thread的?它会给你吗?ThreadPool的做法是:
想要从池中拿Thread?没门儿!你不知道自己多线程知识多菜啊?小心玩火自焚。要执行任务的话,你自己把Task丢进来,哥罩着你。
也就是说,ThreadPool从一开始就没想过让你们把Thread拿走!但你们又要返回结果咋整?我返回一个FutureTask,需要结果的话,自己FutureTask#get()。但主动权还是在ThreadPool这,能不能拿到结果、是否要阻塞都是它说了算!
到这里,ExecutorService为什么要提供返回值、AbstractExecutorService为什么要引入FutureTask都说得通了!然而,大部分人觉得线程池难,并非搞不清楚线程“池”,而是不了解它是如何“做功”的,也就是说:线程池是如何执行任务的呢?
这就涉及到线程池和一般池化技术最大的不同:内化执行操作,而且是通过生产消费的模式执行任务,大家可以在后面的Demo中看到。
生产消费模型
如果往线程池不断提交任务,大致会经历4个阶段:
- 核心线程处理任务
- 任务进入任务队列等待
- 非核心线程处理任务
- 拒绝策略
特别是第二个阶段,来不及处理的任务会被暂存入workQueue(任务队列),于是典型的生产消费模型就出现了。
调用者投递Task ====> ThreadPool.workQueue ====> workerThread阻塞获取Task执行
几个重要概念
JVM的Thread与操作系统的线程资源
平时我们都是通过new Thread().start()开启一个线程,而线程本质是操作系统资源,Java作为一门编程语言,是没法自己分配线程的。那么,JVM的Thread对象和操作系统的线程资源之间是什么关系呢?大家不妨想象一个场景:
在一个风雨交加的郊外,龙虎山张天师独自一人面对各路妖孽!只见张天师缓缓从怀中掏出一张“天雷符”,指尖轻轻一捻,符咒顿时蹿起了蓝色火苗。说时迟那时快,天空忽然乌云密布发出轰鸣,然后一条紫电如龙蛇一般从天而降,劈向那群魑魅魍魉。
“天雷符”不是真正的天雷,但通过天雷符可以召唤天雷,是一种契约绑定关系,Thread和操作系统的线程同理。
线程池如何复用线程?
有时候,要解决一个问题,从反方向入手可能更简单些。我们暂且先不管如何复用线程,我就问大家:如何回收/销毁线程?(知道线程什么情况会被销毁,那么只要避免销毁,也就可以复用)
“线程”这个词,其实有两个层次的指代:Thread对象、JVM线程资源(本质还是操作系统线程)。Thread对象与线程资源之间是绑定关系,一个线程资源被分配后,会找到Thread#run()作为代码的执行入口。
线程什么时候销毁呢?正常来说,new Thread(tartget).start()后,操作系统就会分配线程资源,而等到线程执行完Thread#run()中的代码,就会自然消亡。至于Thread对象,如果没有引用,也会被GC回收。
看到这,我想大家应该明白了:只要任务永远不结束,线程就永远死不了。任务如何才能永远不结束呀?要么循环做任务、要么阻塞。
线程池本质也是Thread,只是单体和集合的区别。既然Thread“跑完任务就销毁”的特性是天生的、注定的,线程池也无法改变这一点。所以,线程池要想让内部线程一直存活着,就要keeps threads busy working,也就是让它们一直干活。实在没活干怎么办?那就阻塞着呗(可以用阻塞队列)!总之,不能让你“执行完毕”,否则就销毁了。
如何保证只销毁“非核心线程”
大家都听过一些八股文的口诀,比如“在空闲时间,如果非核心线程空闲超过keepAliveTime就会被回收”,这是怎么实现的呢?
首先,有一个常见的误区是,很多人以为线程池创建线程时会给每一个Thread做标记,比如给核心线程标记为coreThread,非核心线程标记为nonCoreThread,然后空闲时间回收nonCoreThread。然而JDK的Doug Lea可不这么想,人家采用的方案更加简单粗暴:
- 当前线程数 <= corePoolSize,那么所有线程都是核心线程,不回收
- 当前线程数 > corePoolSize,回收多余线程
看吧,超出corePoolSize时,超出的部分线程就是“多余”的,实际回收时不会做具体选择,有一个回收一个,直到线程数不大于corePoolSize。(后面有一张示意图,可以帮助理解线程是如何被回收的)
超过corePoolSize后为什么优先入队?
很多面试官喜欢问:为什么核心线程数满了以后,线程池选择优先把任务丢入队列,而不是立即分配非核心线程处理呢?
实际上,这是把因果倒置了。线程池原本就是打算用队列缓冲执行任务,但同时为了保证“弹性”,所以才允许队列满了以后扩展非核心线程加快效率。所以不要问“为什么corePoolSize满了以后要优先入队,而不是直接分配非核心线程”,而应该思考“队列满了以后,再分配非核心线程的好处是什么”。
另外,线程资源比较宝贵(特别是频繁创建销毁),所以能用队列缓冲的话,就别额外创建线程咯。
简单版TheadPool
为了降低难度,我们先写一个极简版的线程池。
public class SimpleThreadPool {
/**
* 任务队列
*/
BlockingQueue<Runnable> workQueue;
/**
* 工作线程
*/
List<Worker> workers = new ArrayList<>();
/**
* 构造器
*
* @param poolSize 线程数
* @param workQueue 任务队列
*/
SimpleThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
// 创建线程,并加入线程池
for (int i = 0; i < poolSize; i++) {
Worker work = new Worker();
work.start();
workers.add(work);
}
}
/**
* 提交任务
*
* @param command
*/
void execute(Runnable command) {
try {
// 任务队列满了则阻塞
workQueue.put(command);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 工作线程,负责执行任务
*/
class Worker extends Thread {
public void run() {
// 循环获取任务,如果任务为空则阻塞等待
while (true) {
try {
Runnable task = workQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
示意图
测试案例
public class SimpleThreadPoolTest {
public static void main(String[] args) {
SimpleThreadPool simpleThreadPool = new SimpleThreadPool(2, new ArrayBlockingQueue<Runnable>(2));
simpleThreadPool.execute(() -> {
System.out.println("第1个任务开始");
sleep(3);
System.out.println("第1个任务结束");
});
simpleThreadPool.execute(() -> {
System.out.println("第2个任务开始");
sleep(4);
System.out.println("第2个任务结束");
});
simpleThreadPool.execute(() -> {
System.out.println("第3个任务开始");
sleep(5);
System.out.println("第3个任务结束");
});
}
private static void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上面SimpleThreadPool的核心只有一个:生产消费模型,不涉及提交任务时的各种逻辑判断(直接加入阻塞队列),也没有非核心线程的销毁。另外,往阻塞队列提交任务时用的是put,队列满了以后会阻塞,这种“拒绝策略”显然不太合理。复杂版的ThreadPool则会在此基础上扩展,把一些细节补充完整。
复杂版ThreadPool
由于拒绝策略并不是核心逻辑,这里就直接用抛异常代替了。除此以外,代码逻辑与结构、甚至变量名都基本与ThreadPoolExecutor一致,可谓麻雀虽小五脏俱全,建议debug走读一遍:
public class ThreadPool {
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 工作线程
*/
private final List<Worker> workers = new ArrayList<>();
/**
* 任务队列
*/
private BlockingQueue<Runnable> workQueue;
/**
* 核心线程数
*/
private final int corePoolSize;
/**
* 最大线程数
*/
private final int maximumPoolSize;
/**
* 非核心线程最大空闲时间(否则销毁线程)
*/
private final long keepAliveTime;
public ThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
}
public void execute(Runnable task) {
Assert.notNull(task, "task is null");
// 创建核心线程处理任务
if (workers.size() < corePoolSize) {
this.addWorker(task, true);
return;
}
// 尝试加入任务队列
boolean enqueued = workQueue.offer(task);
if (enqueued) {
return;
}
// 创建非核心线程处理任务
if (!this.addWorker(task, false)) {
// 非核心线程数达到上限,触发拒绝策略
throw new RuntimeException("拒绝策略");
}
}
private boolean addWorker(Runnable task, boolean core) {
int wc = workers.size();
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
boolean workerStarted = false;
try {
Worker worker = new Worker(task);
final Thread thread = worker.getThread();
if (thread != null) {
mainLock.lock();
workers.add(worker);
thread.start();
workerStarted = true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
mainLock.unlock();
}
return workerStarted;
}
private void runWorker(Worker worker) {
Runnable task = worker.getTask();
try {
// 循环处理任务
while (task != null || (task = getTask()) != null) {
task.run();
task = null;
}
} finally {
// 从循环退出来,意味着当前线程是非核心线程,而且需要被销毁
// Java的线程,既可以指代Thread对象,也可以指代JVM线程,一个Thread对象绑定一个JVM线程
// 因此,线程的销毁分为两个维度:1.把Thread对象从workers移除 2.JVM线程执行完当前任务,会自然销毁
workers.remove(worker); // 这里前后应该加锁,否则线程不安全。由于是demo,很多处理比较随意
}
}
private Runnable getTask() {
boolean timedOut = false;
// 循环获取任务
for (; ; ) {
// 是否需要检测超时:当前线程数超过核心线程
boolean timed = workers.size() > corePoolSize;
// 需要检测超时 && 已经超时了
if (timed && timedOut) {
return null;
}
try {
// 是否需要检测超时
// 1.需要:poll阻塞获取,等待keepAliveTime,等待结束就返回,不管有没有获取到任务
// 2.不需要:take持续阻塞,直到获取结果
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
@Getter
@Setter
private class Worker implements Runnable {
private Thread thread;
private Runnable task;
public Worker(Runnable task) {
this.task = task;
thread = new Thread(this);
}
@Override
public void run() {
runWorker(this);
}
}
}
代码示意图(虚线框内由Thread异步执行):
这个图有瑕疵,实际上线程池并不区分coreThread和nonCoreThread,仅看当前线程数是否大于corePoolSize
测试案例
@Slf4j
public class ThreadPoolTest {
public static void main(String[] args) {
// 创建线程池,核心线程1,最大线程2
// 提交4个任务:第1个任务交给核心线程、第2个任务入队、第3个任务交给非核心线程、第4个任务被拒绝
ThreadPool threadPoolExecutor = new ThreadPool(
1,
2,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1)
);
threadPoolExecutor.execute(() -> {
log.info("{}:执行第1个任务...", Thread.currentThread().getName());
sleep(10);
});
sleep(1);
threadPoolExecutor.execute(() -> {
log.info("{}:执行第2个任务...", Thread.currentThread().getName());
sleep(10);
});
sleep(1);
threadPoolExecutor.execute(() -> {
log.info("{}:执行第3个任务...", Thread.currentThread().getName());
sleep(10);
});
sleep(1);
threadPoolExecutor.execute(() -> {
log.info("{}:执行第4个任务...", Thread.currentThread().getName());
sleep(10);
});
sleep(1);
log.info("main结束");
}
private static void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
大家可以把测试案例中的线程池换成JDK的ThreadPoolExecutor,执行效果很类似:
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
进群,大家一起学习,一起进步,一起对抗互联网寒冬