目录
1 java构建线程的方式
2 线程池的7个参数
3 线程池属性标识&线程池的状态
3.1 核心属性
3.2 线程池的状态
4 线程池的执行流程
5 添加工作线程的流程
6 Worker的封装&后续任务的处理
1 java构建线程的方式
一般就3~4种:
- 继承Thread:发现Thread类已经实现了Runnable,继承Thread类时,本质也是间接的实现了Runnable
- 实现Runnable:(本质只有Runnable这种方式)
- 实现Callable:手动去实现话,将Callable的实现传入到FutureTask,讲FutureTask传入Thread的有参构造。发现FutureTask本身也间接的实现的Runnable接口。
- 线程池:线程池构建工作线程时,构建的Worker对象,Worker也实现了Runnabl接口。
本质只有1种:实现Runnable接口
2 线程池的7个参数
为什么要用线程池?(玩多线程的目的是为了充分发挥硬件资源的性能。)
- 多线程场景:接口优化(让之前串行处理的任务,现在可以并行处理加快速度。)
- 管理线程(控制线程数量):线程也要占用内存资源,线程需要被CPU调度,如果线程数量不好好控制,反而会造成业务处理速度变慢。
- 重复利用(重复创建问题):每次线程的构建都需要分配内存资源,用完之后,还要归还内存资源,成本很高,利用连接池的思想,构件好反复用就ok。
JDK其实提供了Executors的工具类,可以去很方便的构建一些常用的线程池。BUT,因为Executors封装好之后,很多参数无法主动去设置,不能更好的去管理线程池。
使用线程池,强烈推荐手动构建ThreadPoolExecutor。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// corePoolSize:核心线程数(核心线程默认情况下不会被干掉,一直在线程池里)
// maximumPoolSize:最大线程数(核心线程数 + 非核心线程数)
// keepAliveTime:非核心线程的最大空闲时间。
// unit:最大空闲时间的时间单位
// workQueue:首先有活来了,先让核心线程处理,核心线程处理不过来了,先堆workQueue里。实在处理不过来。临时
构建非核心线程,来处理workQueue中的任务。
// threadFactory:构建thread对象的一个工厂。
// handler:拒绝策略,线程没有空闲,阻塞队列也放满了,这时,再来任务,就拒绝滴干活~
}
3 线程池属性标识&线程池的状态
3.1 核心属性
线程池中核心属性是AtomicInteger 类型的ctl(可以理解为线程安全的int类型)。ctl的高3位维护了线程池状态,低29位维护了工作线程个数。
// 线程池的核心属性,ctl,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//可以看做:private int ctl = 0;
// ctl属性标识了线程池的两个信息
// int是一个32个bit位的数值
// 其中高3位,标识线程池状态。 低29位,标识工作线程个数(工作线程的最大值:2^29-1)
// Integer.SIZE ,是获取int类型占用的位数。 Integer.SIZE = 32
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_BITS = 29;
// CAPACITY 线程池中最大的工作线程个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
00000000 00000000 00000000 00000001(1)
00100000 00000000 00000000 00000000(1 << 29)
00011111 11111111 11111111 11111111(1 << 29) - 1
3.2 线程池的状态
RUNNING:一切正常,任务正常处理
SHUTDOWN:不接收新任务,但是正在处理的任务要完成,阻塞队列的任务要处理完毕。
STOP:不接收新任务,中断正在处理的任务,阻塞队列的任务全部丢掉。
// 线程池的5个状态
111
private static final int RUNNING = -1 << COUNT_BITS;
000
private static final int SHUTDOWN = 0 << COUNT_BITS;
001
private static final int STOP = 1 << COUNT_BITS;
010
private static final int TIDYING = 2 << COUNT_BITS;
011
private static final int TERMINATED = 3 << COUNT_BITS;
4 线程池的执行流程
分析执行流程,要查看execute方法。
// 提交任务到线程池的方法
public void execute(Runnable command) {
// 非空判断,没啥说的。
if (command == null)
throw new NullPointerException();
// 获取核心属性。
int c = ctl.get();
// workerCountOf:获取工作线程个数
// 当前工作线程个数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加一个工作线程 true:核心线程 false:非核心线程
// 添加工作线程会有并发问题,如果添加成功返回true,失败返回false
if (addWorker(command, true))
// 如果添加成功,告辞~~~
return;
// 添加失败,重新获取ctl,看下什么情况
c = ctl.get();
}
// isRunning:线程池状态还是RUNNING嘛?
// 如果是RUNNING,就把任务扔到阻塞队列里,先排队等着。
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取ctl属性
int recheck = ctl.get();
// 再次确认线程池状态是否是RUNNING,如果不是RUNNING,将任务从阻塞队列移除
if (!isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 现在阻塞队列有任务,但是没有工作线程
else if (workerCountOf(recheck) == 0)
// 创建一个非核心线程,去处理阻塞队列任务
addWorker(null, false);
}
// 如果任务没有扔到阻塞队列,添加非核心线程去处理任务
else if (!addWorker(command, false))
// 如果添加非核心线程失败,执行拒绝策略
reject(command);
}
5 添加工作线程的流程
添加工作线程本质就是addWorker方法的流程
private boolean addWorker(Runnable firstTask, boolean core) {
//=====================线程池状态&工作线程个数的判断========================================
retry:
for (;;) {
// 拿ctl并且拿到高三位的值
int c = ctl.get();
int rs = runStateOf(c);
// 状态大于等于SHUTDOWN(如果状态不是RUNNING,不需要添加工作线程)
if (rs >= SHUTDOWN &&
// 阻塞队列有任务,没有工作线程,会addWorker(null,false);
// 线程池状态是SHUTDOWN,并且阻塞队列任务,没有工作线程处理
// 针对addWorker(null,false)的放行
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
// 进来就无法处理,因为状态不对,直接告辞~
return false;
// 判断工作线程个数
for (;;) {
// 获取工作线程个数
int wc = workerCountOf(c);
// 如果大于低29位,直接告辞。
if (wc >= CAPACITY ||
// 根据core决定判断核心线程个数还是最大线程个数
wc >= (core ? corePoolSize : maximumPoolSize))
// 个数不够~
return false;
// 基于CAS对ctl + 1
if (compareAndIncrementWorkerCount(c))
// 如果成功,直接跳出外层for循环
break retry;
// 到这,说明出现了并发情况,重新获取一次ctl
c = ctl.get();
// 如果状态改变了。重新走外层for循环
if (runStateOf(c) != rs)
continue retry;
}
}
//=====================创建工作线程&启动工作线程========================================
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// new一个工作线程,将任务扔进去。
w = new Worker(firstTask);
// 拿到工作线程的Thread
final Thread t = w.thread;
// Thread是ThreadFactory构建出来的,如果构建的是null,是程序员提供的ThreadFactory有问题
if (t != null) {
// 加锁,因为后面要操作hashSet还有一个int类型,保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 状态是RUNNING
if (rs < SHUTDOWN ||
// 这个是为了addWorker(null,false)情况的判断
(rs == SHUTDOWN && firstTask == null)) {
// 线程启动了么?避免程序员提供的ThreadFactory有问题
if (t.isAlive())
throw new IllegalThreadStateException();
// 工作线程扔Set集合,用HashSet维护的工作线程
workers.add(w);
int s = workers.size();
// largestPoolSize是记录线程池中工作线程数的最大值记录
if (s > largestPoolSize)
largestPoolSize = s;
// 添加成功!
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 如果添加成功,启动工作线程,并且设置workerStarted为true
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
6 Worker的封装&后续任务的处理
调用Worker的有参构造时,会基于ThreadFactory构建线程,并且将Worker对象本身作为Runnable 传入到Thread对象中,当执行Thread的start方法时,会执行Worker的run方法,最终执行是run方法内部的runWorker方法。
后续处理任务的方式~
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
// 任务的获取方式有2种
// 1:直接基于addWorker携带过来的任务,优先处理。
// 2:如果addWorker携带的任务处理完毕,或者没携带任务,直接从阻塞队列中获取
while (task != null || (task = getTask()) != null) {
// 处理任务!!!! 省略部分代码
task.run();
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
for (;;) {
try {
Runnable r = timed ?
// 从阻塞队列获取任务
// 如果执行poll,代表是非核心线程,等待一会,没任务,直接告辞
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 如果执行take,代表是核心线程,死等。
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}