1.创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。
ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));
创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行
2.ThreadPoolExecutor重要参数及方法介绍
//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN = 0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP = 1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.1线程的五种状态
- RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
- SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
00000000 00000000 00000000 00000000 - STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
- TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
00100000 00000000 00000000 00000000 - TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
* 通过CAS来对当前工作线程数增加
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
* 通过CAS来对当前工作线程数减少
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
任务执行流程图
3.提交任务execute
executor.execute(new Runnable() {
@Override
public void run() {
//业务代码
}
});
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0
int c = ctl.get();
//计算获取工作线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
//当前command增加为核心工作线程,添加失败下面会进行入队操作
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断线程池状态(判断是因为防止别的线程把状态进行修改)
//workQueue.offer(command) 加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再对线程池状态二次检查,如果不是running则移除队列
if (! isRunning(recheck) && remove(command))
//拒绝策略,默认抛出异常
reject(command);
else if (workerCountOf(recheck) == 0)
//这里就是执行队列中的任务,下面addWorker里面有体现和讲解
addWorker(null, false);
}
//线程池达到最大了的maxPool,添加失败执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
3.1 submit
Future<?> submit = executor.submit(new Runnable() {
@Override
public void run() {
//业务代码
}
});
这个里面执行了execute,多了一个返回Future
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
4.addWorker
这里不同版本jdk有差异
private boolean addWorker(Runnable firstTask, boolean core) {
//类似于goto
retry:
for (int c = ctl.get();;) {
// 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//
//判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入core
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作
if (compareAndIncrementWorkerCount(c))
break retry;//跳出外层循环
c = ctl.get(); // Re-read ctl
//如果线程池状态>=SHUTDOWN 跳到外层循环继续执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建任务Worker,会利用线程工厂去创建一个线程默认的是
/**
* Worker(Runnable firstTask) {
* //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0
* // 正常应该是acquire时候+1 release时候-1 这里重写过方法
* setState(-1);
* this.firstTask = firstTask;
* this.thread = getThreadFactory().newThread(this); }
**/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//这里的mainLock是对Workers进行操作的,防止出现并发问题
//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
//线程池如果是RUNNING状态
// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // 如果线程已经在运行,就抛出异常
throw new IllegalThreadStateException();
//添加任务到工作线程的容器里
workers.add(w);
int s = workers.size();
//largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//这里才到线程运行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctl
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
5.Worker相关
5.1 构造器
Worker(Runnable firstTask) {
//这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0
// 正常应该是acquire时候+1 release时候-1 这里重写过方法
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
5.2 tryAcquire和tryRelease
重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
5.3 runWork
public void run() {runWorker(this);}
final void runWorker(Worker w) {
//获取当前工作线程
Thread wt = Thread.currentThread();
//获取需要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//执行任务不为空 或者 队列中获取到了需要执行的任务
//如果没有获取到getTask是会阻塞的
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池状态>=STOP 并且当前线程没有被打断
//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断
//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为
beforeExecute(wt, task);
try {
task.run();
//执行后或异常切面
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
//执行任务数
w.completedTasks++;
w.unlock();
}
}
//正常执行才会为false,表示正常退出
completedAbruptly = false;
} finally {
//执行失败completedAbruptly为true
processWorkerExit(w, completedAbruptly);
}
}
5.4 getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//工作中的线程数量
int wc = workerCountOf(c);
// 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置
//工作线程数量>核心线程数量
//用来判断是否是无限阻塞
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少
// && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//超时阻塞和无限阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//超时,循环时会去处理返回null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
6.shutdown()
shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改状态为SHOUTDOWN,并修改ctl
advanceRunState(SHUTDOWN);
//这里会中断工作中的线程
interruptIdleWorkers();
onShutdown(); // 空方法
} finally {
mainLock.unlock();
}
tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历Workers,遍历前加锁
for (Worker w : workers) {
Thread t = w.thread;
//把没有被打断并且没有在工作中的线程打断
//获取到锁说明线程是空闲的,没有获取到锁说明在执行任务
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
7.shutdownNow()
shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。
List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改状态为STOP,并修改ctl
advanceRunState(STOP);
//中断线程
interruptWorkers();
//返回队列中的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//最后一个线程结束时候会把线程池状态改为TERMINATED
tryTerminate();
return tasks;
}
private void interruptWorkers() {
//中断所有工作线程
for (Worker w : workers)
w.interruptIfStarted();
}
void interruptIfStarted() {
Thread t;
//getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}