1、定义线程池
/**
* 使用给定的初始参数创建新的 ThreadPoolExecutor对象,就创建了一个线程池
* @param corePoolSize - 要保留在池中的线程数,即使它们处于空闲状态,若果allowCoreThreadTimeOut设置为ture,那么核心线程在keepAliveTime之后也会被回收
* @param maximumPoolSize – 池中允许的最大线程数
* @param keepAliveTime – 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最长时间,如果为0,表示不回收
* @param unit – keepAliveTime 参数的时间单位
* @param workQueue – 用于在执行任务之前保存任务的队列。此队列将仅保存由execute方法提交的可运行任务。
* @param threadFactory – 执行程序创建新的线程处理程序时使用的工厂,可以自定义线程名称,用于排错
* @param handler – 由于达到线程边界和队列容量而被阻止执行时使用的处理程序ThreadPoolExecutor定义了四种拒绝策略,可以通过实现RejectedExecutionHandler接口自定义拒绝侧率
* CallerRunsPolicy:如果线程池是RUNNING状态,直接调用run方法执行,不用线程池调用服务,否则丢弃任务
* AbortPolicy:抛出异常,线程池默认的拒绝策略
* DiscardPolicy:直接丢弃任务
* DiscardOldestPolicy:如果线程池是RUNNING状态,移除队列头部的任务(下一个任务不在执行),直接用线程池执行当前提交的任务,等于把当前任务替换了快要执行的下一个任务
* //自定义拒绝策略,如:假如不能丢弃任务,我们也不能无限制的往队列里面添加任务,容易内存不足报错,所以我们可以重新把任务一直往队列里面放,并且发出通知,不要产生新任务,另外一个地方定时检查线程池大小,发出通知可以产生新任务
* (r, e)->{
* e.getQueue().put(r);
* }
* @Throws:
* IllegalArgumentException – 非法参数异常 – 如果以下情况之一成立:corePoolSize < 0 keepAliveTime < 0 maximumPoolSize <= 0 maximumPoolSize < corePoolSize
* NullPointerException – if workQueue or threadFactory or handler is null
**/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池自定义的4种拒绝策略,通过new ThreadPoolExecutor.AbortPolicy()就可以使用了
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//判断线程池是不是RUNNING
r.run();
//直接调用run方法
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
//抛异常
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//不做任何事,任务丢弃
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//判断线程池是不是RUNNING
e.getQueue().poll();
//移除头部的任务
e.execute(r);
//直接用线程池执行当前方法,就是替换下方法
}
}
}
线程池的五种状态
RUNNING:运行状态
SHUTDOWN:线程池关闭,不在接收新任务,但线程池队列中的任务还是会执行完成
STOP:线程池停止,不接受新任务,且尝试终止线程池队列汇总的任务
TIDYING:线程池队列中所有的任务已经完成
TERMINATED:线程池状态为终止
//存储了线程池状态和工作线程数量,CAS自旋设置
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
2、继承ThreadPoolExecutor可以实现自己的线程池方案,可以重写下面几个保护方法,也可以覆盖父类的public方法,从而实现自己的线程池逻辑,也可以直接继承AbstractExecutorService抽象方法实现
/**
* GC回收,
*
* @param t
* @param r
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
//校验能不能终止线程,CAS设置线程池状态为SHUTDOWN,终止所有的线程,尝试终止线程池
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
/**
* 线程执行开始前
*
* @param t
* @param r
*/
protected void beforeExecute(Thread t, Runnable r) {
}
/**
* 线程执行完成后
*
* @param t
* @param r
*/
protected void afterExecute(Runnable r, Throwable t) {
}
/**
* 线程池终止方法(在线程池等待队列为空,但线程池状态不是TERMINAL之前)
*/
protected void terminated() {
}
ScheduledThreadPoolExecutor就是继承了ThreadPoolExecutor并且实现了ScheduledExecutorService接口实现定时器功能
3、线程池的执行过程
3.1、提交任务
一般都是线程池的execute和submit方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//封装一个又返回值的任务对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
什么时候线程池会执行拒绝策略:
- 线程池从RUNNING && 线程池队列未满,但任务添加进入线程池后,线程池状态变为非RUNING;
- 线程池不是RUNNING或者线程池队列已满,尝试添加任务执行(主要是能不能新建线程执行)失败;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果当前工作线程总个数<核心线程,直接新增一个任务,如果新增成功,直接返回,说明有别的任务先提交更新了线程池信息
if (addWorker(command, true))
return;
//获取新的线程池信息
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//线程池是否运行中 && 线程池队列未满,可以新增任务
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果线程池不是运行的,移除任务,移除任务会调用回收资源,尝试终止线程操作,调用线程池实现的拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
//如果线程池是运行状态,并且工作线程个数为0,直接添加任务,这里面是执行任务的真正逻辑
addWorker(null, false);
}
else if (!addWorker(command, false))
//如果线程池状态非RUNNING || 队列线程池已满,那就尝试添加任务,如果不成功,调用线程池实现的拒绝策略
reject(command);
}
真正开始执行任务(执行线程,都需要执行当前方法)
线程池在SHUTDOWN状态之后是不是不能执行任务了
线程池在非RUNNING状态不能提交任务到线程池队列,但在SHUTODWN状态下,如果线程池队列不为空,可以继续执行线程池队列中的任务,直到线程池队列状态为空,才跳出循环
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程池状态不是RUNNING获取SHUTDOWN && (SHUTDOWN && 当前正在执行的任务为空 && 线程池队列里面不为空)->线程池不是RUNNING || 线程池是SHUTDOWN,但正在执行的线程不为NULL || 线程池队列里面存在任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//添加任务失败
return false;
for (;;) {
int wc = workerCountOf(c);
//线程池中的工作线程大于等于最大值 || (大于核心线程 || 最大线程数,根据入参是true还是false判断),返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//添加任务失败
return false;
//CAS增加一个worker,增加成功,退出循环跳到retry:,往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
//如果没有设置成功
c = ctl.get(); // Re-read ctl
//线程状态以改变,继续循环,重新执行for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//获取堆的第一个任务,时间最前面,并根据线程工厂创建线程,Worker重写了toString方法,Worker对象实现了Runnable接口,并且创建线程时把this(当前worker对象作为Runnable放入了Thread,当t.start时就是执行worker对象的run方法)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁,这个线程不能中断
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//线程池状态RUNNING 或者 SHUTDOWN && 堆首任务不存在
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果线程已经执行,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//把新的任务加入到workers里面,workers就是个Set,如果toString方法相同,覆盖
workers.add(w);
//获取当前工作组个数,如果大于最大值,更新最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//新增工作成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//新增成功,开始执行任务t为Worker绑定的工作线程,Worker的工作线程传入的是它自己,也就是启用调用的是Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker的构造方法方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//堆首任务最先执行,ScheduledThreadPoolExecutor.ScheduledFutureTask类型
this.firstTask = firstTask;
//获取到Bean工厂就是ThreadPoolTaskScheduler对象,CustomizableThreadFactory->CustomizableThreadCreator创建线程,并且传入this(当前worker对象,需要实现Runnable接口)
this.thread = getThreadFactory().newThread(this);
}
Worker的toString方法
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
//线程池状态,工作线程个数,未激活的线程个数,队列长度,未完成的任务数量
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
执行Worker对象的run方法,如果线程池状态为STOP,任务是否执行
当前worker线程在执行之前,会判断线程池状态是否是STOP,如果是,则终止线程
public void run() {
runWorker(this);
}
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
//防止任务重复执行
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池状态是STOP,终止线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行当前任务之前做预处理,空方法,子类可以实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//此处实现的是addWorker传入的Runnable,,一般线程为用户提交的Runnable,定时器调度线程是ScheduledFutureTask类,调用它的run方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//执行当前任务完成之后做预处理,空方法,子类可以实现
afterExecute(task, thrown);
}
} finally {
//回收任务,并用于跳出循环,线程池任务完成+1
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()获取线程的方法
- 线程池RUNNING或者SHUTDOWN && 线程池队列不为空,则阻塞线程池
- 如果需要回收线程池allowCoreThreadTimeOut 为true或者活动线程>核心线程,但如果keepAliveTime<=0,直接返回NULL任务,会跳出循环;
- take阻塞线程队列,等待队列中存在新的任务唤醒;
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//线程池不为RUNNING/或者是SHUTDOWN && 线程池为空,返回执行任务为NULL
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//允许回收核心线程 || 当前执行的线程个数大于核心线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//如果大于最大线程,并且可以回收
if (compareAndDecrementWorkerCount(c))
//减少一个Worker数量成功,返回run,否者继续循环等待线程回收
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果线程对象可以回收,使用poll等待回收时间,如果不能回收,使用take()阻塞线程池
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
//阻塞时间
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
//获取队列头部信息
c = count.getAndDecrement();
if (c > 1)
//如果队列大于值在查询之前>1,表示队列中现在至少存在2个任务未执行,需要唤醒别的的等待获取任务的线程,每次唤醒一个线程,最后会根据存在的任务,线程池提交的阻塞任务都会被唤醒
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//获取任务为空,阻塞线程,等待唤醒
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//如果队列大于值在查询之前>1,表示队列中现在至少存在2个任务未执行,需要唤醒别的的等待获取任务的线程,每次唤醒一个线程,最后会根据存在的任务,线程池提交的阻塞任务都会被唤醒
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
任务执行完成,更新线程池状态,尝试终止线程池,回收线程资源
线程回收,线程池可能保存的最小线程,存在四种情况
- 如果allowCoreThreadTimeOut=false,最小线程等于核心线程;
- 如果allowCoreThreadTimeOut=true,并且线程池队列不为空,那么最小线程数量为1;
- 如果allowCoreThreadTimeOut=true,并且线程池队列为空,那么最小线程数为0;
- 如果当前工作线程>=上面存在的三种情况获得的最小线程数,那无需回收,否则回收线程;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//如果任务执行失败,抛异常等,任务执行完成-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新执行完成任务数量
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止回收线程
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 如果状态是RUNNING或者SHUTDOWN
if (!completedAbruptly) {
//任务执行正常完成,默认false,如果设置为true,那最小线程就为0,否者为核心线程数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
//如果核心线程
min = 1;
if (workerCountOf(c) >= min)
//如果工作线程大于等于最小线程,直接返回,无需回收
return; // replacement not needed
}
//因为新增Running为空,worker新增失败,执行addWorker的addWorkerFailed方法,回收线程
addWorker(null, false);
}
}
addWorkerFailed回收资源
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//worker不为空,移除worker
workers.remove(w);
//工作worker减少1
decrementWorkerCount();
//尝试终止线程,回收资源
tryTerminate();
} finally {
mainLock.unlock();
}
}
线程池执行remove方法,尝试回收线程资源
final void tryTerminate() {
//没有退出条件的循环
for (;;) {
//ctl存储的是线程池状态和工作线程个数
int c = ctl.get();
//线程状态 RUNNING||(TIDYING || TERMINATED)||(SHUTDOWN && 线程队列不为空),直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//线程执行任务不为0,但线程池状态只能为STOP
if (workerCountOf(c) != 0) { // Eligible to terminate
//终端线程,回收资源,回收一个之后,下次执行到这里,状态还是STOP,worker还是不为0 ,又终止一个,直到都终止完成,之后对不是终止状态的线程池终止
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果线程状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//终止线程,空方法,子类可以实现从TIDYING状态变为TERMINATED状态之前做一定逻辑判断
terminated();
} finally {
//设置线程池状态为终止
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有的终止任务,根据链表,通过unpark一个一个暂停线程。
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
终端Worker线程,回收Worker线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
//获取所有活动的线程对象
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//终端一个后跳出循环
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
//终止成功,移除任务
remove(this);
return cancelled;
}
public boolean cancel(boolean mayInterruptIfRunning) {
//如果状态不是新 || 线程状态不能设置为CANCELLED,返回false,不能中断
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//中断线程
t.interrupt();
} finally { // final state
//设置线程状态为中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//回收线程资源
finishCompletion();
}
return true;
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//循环队列节点
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//当前节点循环
Thread t = q.thread;
if (t != null) {
q.thread = null;
//终止线程
LockSupport.unpark(t);
}
//获取当前节点的下游节点,如果为空,表示已当前节点作为初始节点的下游节点都已经被终止了
WaitNode next = q.next;
if (next == null)
break;
//下一个worker节点为空,解绑worker和线程池关系,GC能正常回收
q.next = null; // unlink to help gc
//把它的下游节点设置为头部节点,循环上终止,移动节点任务
q = next;
}
break;
}
}
//空方法,子类可以实现,比如判断线程是否都已回收,worker已经未0等等
done();
callable = null; // to reduce footprint
}
4、Spring提供的4种线程池,阿里云一般推荐自定义线程池,就是new ThreadPoolExecutor
Executors类通过静态方法定义了5种线程池
- 固定线程个数的线程池,核心线程=最大线程=入参
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 单线程线程池,类似固定大小,只是线程大小为1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 缓存线程池,核心线程为0,默认缓存60s,就会回收不用的线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 单线程定时调度线程池,能执行定时任务的单线程线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
- 设置核心线程个数的定时调度线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- 传入自定义的线程池
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
- 传入自定义的定时任务线程池
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}