前几天写了这个博客:
Java实现业务异步的几种方案-CSDN博客
应粉丝要求,写一下线程池细节方面的东西,在看了很多资料和讲解视频后做如下讲解:
一、线程池解决的问题
为什么有异步任务不去手动的new,而是基于线程池来做?
类比连接池,说人话就是频繁的构建和销毁线程,消耗的资源是比较大。
为了更好的控制任务执行的时机(基于硬件资源来考虑)。因为并不是说,线程数越多越好,任务在执行时,是CPU在调度线程,如果线程过多的话,CPU需要频繁的切换上下文,这种情况反而会让任务执行的效率太低。
二、线程池的核心参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}, new ThreadPoolExecutor.AbortPolicy());
executor.execute(() -> {
System.out.println(111);
});
ThreadPoolExecutor的个七核心参数
- 核心线程数
- 最大线程数
- 最大空闲时间
- 空闲时间单位
- 线程工厂
- 工作队列
- 拒绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 核心线程数:当任务提交到线程池时,会优先构建核心线程。并且核心线程默认不会被销毁。干完活,等新活。
// 最大线程数:最大线程数 - 核心线程数 = 非核心线程数(临时线程数)。非核心线程在空闲了一段时间后, // 会被干掉.
// 最大空闲时间:这个就是非核心线程允许最大的空闲时间。
// 空闲时间单位:最大空闲时间的单位,可以是毫秒,秒之类的。
// 线程工厂:这个是构建线程的工厂,帮你new Thread类的。
// 工作队列:工作队列是用来存储任务的,无论是核心线程还是非核心线程,在初始化之后,后续要执行的任务
// 都是在工作队列中获取的。 // 工作线程只有在初始化的时候,会带着任务执行。除此之外,工作线程获取任务的方式都是从工作队列获取
// 拒绝策略:当核心线程都在忙,工作队列扔满了,非核心线程都在忙,此时再来任务就走拒绝策略。 // 默认的策略是抛异常~~
}
问题:如果线程池指定了2个核心线程数,现在线程池有1个核心线程,此时我提交一个任务,这个任务是交给核心线程处理,还是构建另一个核心线程呢?
这里是优先构建核心线程去处理任务,只有核心线程全部初始化完毕后,才会重复的利用核心线程处理任务。
三、线程池属性标识&线程池的状态
四、线程池的执行流程图
简单流程:
针对任务添加到队列后的操作细节:
五、源码剖析
1. 线程池的execute方法执行
// 任务提交到线程池之后的处理流程。
public void execute(Runnable command) {
// 健壮性判断
if (command == null)
throw new NullPointerException();
// 拿到线程池核心属性ctl
int c = ctl.get();
// 工作线程数 是否小于 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程,执行任务
if (addWorker(command, true))
// 核心线程添加成功,结束
return;
// 添加核心线程失败,有并发情况,重新获取ctl,拿到最新的ctl。
c = ctl.get();
}
// 线程池状态是RUNNING,添加任务到工作队列
if (isRunning(c) && workQueue.offer(command)) {
// 任务已经放在工作队列了。
// 重新的获取了ctl
int recheck = ctl.get();
// 线程池状态不是RUNNING了,如果不是RUNNING了,就将刚刚添加进去的任务从阻塞队列移除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略……
reject(command);
// 如果满足 workerCountOf(recheck) == 0 ,代表工作队列可能有任务,但是线程池中没工作线程
else if (workerCountOf(recheck) == 0)
// 为了避免任务饥饿,构建一个非核心的工作线程来处理工作队列中的任务
addWorker(null, false);
}
// 添加非核心线程,如果成功,直接结束
else if (!addWorker(command, false))
// 添加非核心线程失败,直接拒绝策略
reject(command);
}
2. 线程池的addWorker方法
// 添加工作线程(核心和非核心都从这走)
// core:为true,代表添加核心线程,为false,代表添加非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// =====================判断线程池状态==================================
// 拿ctl
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 线程池状态是否大于等于SHUTDOWN(状态不是RUNNING)
if (rs >= SHUTDOWN &&
// 线程池状态是SHUTDOWN,工作队列不为空,并且添加的任务是null
// 此时就是在处理工作队列有任务,但是没有工作线程的情况,这个情况不能阻拦~~
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
// 线程池状态为STOP,或者线程池状态为SHUTDOWN并且工作队列没任务
// 直接告辞,不添加工作线程
return false;
for (;;) {
// =====================判断工作线程个数==================================
// 获取工作线程个数
int wc = workerCountOf(c);
// 判断线程个数是否超过最大限制
if (wc >= CAPACITY ||
// 核心线程,判断核心线程数
// 非核心线程,判断最大线程数
wc >= (core ? corePoolSize : maximumPoolSize))
// 代表超过限制了,直接告辞~
return false;
// 基于CAS,对ctl进行+1操作,
if (compareAndIncrementWorkerCount(c))
// 成功的,跳出外层for循环,走后续的添加线程逻辑
break retry;
// CAS失败,说明有并发问题
// 重新获取ctl
c = ctl.get();
// 查看线程池状态是否有变化
if (runStateOf(c) != rs)
// 状态发生变化,重新走外层for循环
continue retry;
// 如果状态没变化,重新走内层for循环,判断工作线程个数
}
}
// =====================创建工作线程==================================
// =====================启动工作线程==================================
// 启动工作线程成功了咩~~
boolean workerStarted = false;
// 创建工作线程成功了咩~~
boolean workerAdded = false;
// Worker就是工作线程
Worker w = null;
try {
// 工作线程是new出来的,任务也扔进去了。~~~
w = new Worker(firstTask);
// 拿到了创建好的工作线程的thread对象。
final Thread t = w.thread;
// 线程对象是不是空啊~~如果为空,直接告辞
// 判断线程工厂是不是有问题~~~
if (t != null) {
// 加锁~因为操作HashSet以及修改成员变量的largestPoolSize,线程不安全,加锁就安全了。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取线程池状态
int rs = runStateOf(ctl.get());
// 如果状态为RUNNING正常往下走
// 如果状态为SHUTDOWN,并且传入的任务为空(工作队列有任务,但是没有工作线程的情况)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 到这,状态没问题。
// 如果线程已经启动了,告辞。
// 判断线程工厂是不是有问题~~~
if (t.isAlive())
throw new IllegalThreadStateException();
// 工作线程扔HashSet里
workers.add(w);
// 记录工作线程的最大值。
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 添加工作线程成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 工作添加成功了咩
if (workerAdded) {
// 成功了就启动
t.start();
// 启动工作线程成功
workerStarted = true;
}
}
}
// 省略部分代码
return workerStarted;
}
六、如何设置最优参数呢
这个先说一下,没有最合适,只有说是比较ok的说法。
1. 代码查看服务器的核心数
要合理配置线程数首先要知道公司服务器是几核的
代码查看服务器核数:
System.out.println(Runtime.getRuntime().availableProcessors());
2. 合理线程数配置之CPU密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。
CPU密集型任务配置尽可能少的线程数量:
一般公式:CPU核数+1个线程的线程池
3. 合理线程数配置之IO密集型
IO包括:数据库交互,文件上传下载,网络传输等
方法一:
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2
方法二:
IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。
所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
IO密集型时,大部分线程都阻塞,故需要多配置线程数:
参考公式:CPU核数 /(1 - 阻系数)
比如8核CPU:8/(1 - 0.9)=80个线程数
阻塞系数在0.8~0.9之间
总结起来:
- 服务器的CPU内核数
- cpu密集型和io密集型判断
- 系统的并发量,以及内存情况还有任务允许的延迟时间
基于前两个设置核心线程数。
- cpu密集:核心线程数和CPU基本持平
- io密集:根据IO密集情况,需要具体情况具体分析
- IO时间很长,核心线程数就应该设置的大一些。
- IO时间短,核心线程数相对来说,比IO时间长的少。
- 要根据压测得出一个比较好的结果,尽可能让CPU利用率提高。
基于第三个确认工作队列长度。