文章目录
- 概述
- 线程池提交任务流程
- 线程池提交任务源码阅读
- 源码阅读
- 属性字段
- 工作线程worker
- 线程池方法
- runWorker(Worker w) 运行工作线程
- getTask() 获取任务
- tryTerminate() 尝试终止线程池
- interruptWorkers、interruptIdleWorkers 中断工作线程
- reject(Runnable command) 拒绝任务
- processWorkerExit 工作线程完成任务后退出
- 构造函数
- shutdown shutdownNow 关闭线程池
- allowCoreThreadTimeOut 允许核心线程超时
- setMaximumPoolSize 设置最大线程池数量
- setCorePoolSize 设置核心线程数量
- setKeepAliveTime 设置空闲线程超时时间
- remove 移除任务
- getPoolSize 获取当前工作线程数量
- RejectedExecutionHandler 拒绝策略
概述
线程池提交任务流程
回顾一下线程池提交任务流程:
-
- 先判断核心线程池是否已经已满,即工作线程数是否大于核心线程数,如果不是则创建核心工作线程执行该任务,否则下一步
-
- 判断工作队列是否已经满了,如果队列没有满,则将该任务入队,否则进入下一步
-
- 判断线程池中的线程是否已经满了,即是否大于最大线程数,如果不是则创建工作线程执行该任务,否则拒绝这个任务
线程池提交任务源码阅读
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 工作线程数量
int c = ctl.get();
// 1. 线程池先判断核心线程池是否已经已满,即工作线程数是否大于核心线程数,如果不是则创建核心工作线程执行该任务
if (workerCountOf(c) < corePoolSize) {
// 新增核心的工作线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 判断工作队列是否已经满了,如果队列没有满,则将该任务入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池不再运行 或 移除当前线程
if (! isRunning(recheck) && remove(command))
// 拒绝当前线程
reject(command);
else if (workerCountOf(recheck) == 0)
// 没有正在工作的线程了 新增一个工作线程去执行任务
addWorker(null, false);
}
// 3. 判断线程池中的线程是否已经满了,即是否大于最大线程数,如果不是则创建工作线程执行该任务,否则拒绝这个任务。
else if (!addWorker(command, false))
// 拒绝当前任务
reject(command);
}
新增工作线程, core参数true新增核心工作线程, false非核心线程工作线程:
// 新增工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 状态
int rs = runStateOf(c);
// 停止 或 没有需要处理的任务 + 队列为空
// 无需新增
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 当前工作线程数量
int wc = workerCountOf(c);
// 工作线程大于最大容量失败
// 根据请求参数判断
// 新增核心线程大于最大核心线程数失败
// 新增非核心线程数,大于最大线程数量也失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// workerCount + 1, 成功进入下一步
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 状态变更 重新开始
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 运行状态
int rs = runStateOf(ctl.get());
// 运行中状态 或 终止且无要执行的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加工作线程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 新增工作线程失败,回滚,移除工作线程,数量-1, 尝试终止线程池
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
源码阅读
JUC包下的工具类,继承了AbstractExecutorService
package java.util.concurrent;
// 继承 AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {
属性字段
// 前3位线程池状态、后29位线程数量
// 初始化运行中
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// ctl 后29位记录线程池容量, 32 - 3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大容量 536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// RUNNING(-536870912):接受新任务或者处理队列里的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN(0):不接受新任务,但仍在处理已经在队列里面的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP(536870912):不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断。
private static final int STOP = 1 << COUNT_BITS;
// TIDYING(1073741824): 所以任务都被中断,workerCount 是 0,整理状态
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED(1610612736): terminated() 已经完成的时候
private static final int TERMINATED = 3 << COUNT_BITS;
//runState 之间的转变过程:
//RUNNING -> SHUTDOWN:调用 shudown(), finalize()
//(RUNNING or SHUTDOWN) -> STOP:调用 shutdownNow()
//SHUTDOWN -> tryTerminate() -> TIDYING -> 此时Workers数量为0
//STOP -> tryTerminate() -> TIDYING -> 此时Workers数量为0
//TIDYING -> TERMINATED -> terminated() 执行完成之后
// 前3位线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 后29位线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 主锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程列表
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 最大线程池数量
private int largestPoolSize;
// 完成任务的数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 线程空闲保活时间
private volatile long keepAliveTime;
// 是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数量
private volatile int corePoolSize;
// 最大线程数量
private volatile int maximumPoolSize;
// 默认拒绝策略,放弃,抛出异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
private final AccessControlContext acc;
有关状态和工作线程数量的方法:
// rs:runState 运行状态 wc:workerCount 线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
// c:当前状态 , 是否小于 s 状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// c:当前状态 , 是否大于等于 s 状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// RUNNING:-1 SHUTDOWN:0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 线程数量+1,成功返回 true、失败false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 线程数量-1,成功返回 true、失败false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 线程数量-1
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* CAS 设置 执行状态
* ctl = targetState + workerCountOf(c)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// 是运行状态 或者 停止,停止 且 shutdownOK 才返回 true
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
工作线程worker
工作线程worker继承了 AQS, 实现了Runnable接口
// 工作线程
// 继承了 AQS
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* 序列化id
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Worker中执行的线程,ThreadFactory创建*/
final Thread thread;
/** 线程执行的任务,可能为空 */
Runnable firstTask;
/** 完成的任务数量 */
volatile long completedTasks;
/**
* 使用来自ThreadFactory的给定第一个任务和线程创建
*/
Worker(Runnable firstTask) {
// 设置执行的任务
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// 执行任务
runWorker(this);
}
// 当前的线程是否已经获取到了同步状态
// state 0未获取,1获取
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 获取信号量, 尝试state从0设置为1
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放信号量, state设置为0
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
线程池方法
runWorker(Worker w) 运行工作线程
/**
* 运行Worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 异常完成
boolean completedAbruptly = true;
try {
// 如果task为null,且尝试获取任务也为null,结束循环
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// (线程池开始停止 或者 (线程中断 且 线程池开始停止)) 且 wt 没有中断
// Thread.interrupted()是一个静态方法, 用于判断当前线程是否被中断, 并清除中断标志位
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 留给子类
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 留给子类
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask() 获取任务
// 获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果当前状态不是运行中 或者 STOP、TIDYING、TERMINATED 阻塞队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 工作线程-1
decrementWorkerCount();
return null;
}
// 工作线程
int wc = workerCountOf(c);
// Are workers subject to culling?
// 允许核心线程超时 或者 工作线程数量大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1. 工作线程大于最大线程数 或 (timed 且 超时)
// 2. 工作线程数量>1 或者 工作队列为空
// 1 且 2
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从阻塞队列取出任务,timed 则有时间限制的取出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
tryTerminate() 尝试终止线程池
/**
* 尝试终止线程池
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果还在运行中 或 已经TERMINATED 或 SHUTDOWN 还有工作线程
// 无法 treminate
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// terminated 方法之后 状态从 TIDYING 修改为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
interruptWorkers、interruptIdleWorkers 中断工作线程
interruptIdleWorkers 中断空闲的工作线程
interruptIdleWorkers 需要去获取锁, 如果任务正在执行中是获取不到锁的, 不会中断正在执行的任务
/**
* 中断所有的工作线程
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 中断已经运行的任务
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// 中断Workers
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
// 中断所有Workers
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
reject(Runnable command) 拒绝任务
// 拒绝任务
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
processWorkerExit 工作线程完成任务后退出
// 工作线程完成任务后处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 意外完成, 工作线程数量-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 如果不是意外退出
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
构造函数
/**
* 构造
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 构造
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 构造
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 构造
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
shutdown shutdownNow 关闭线程池
/**
* 关闭
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* 现在关闭
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
/*
*是否正在终止
*/
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
// 线程池是否已经终止
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
// 超时等待线程池终止
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* 结束
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
allowCoreThreadTimeOut 允许核心线程超时
/**
* 允许核心线程超时
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
setMaximumPoolSize 设置最大线程池数量
/**
* 设置最大线程池数量
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
setCorePoolSize 设置核心线程数量
设置设置核心线程数量
如果工作线程大于新的核心线程数量, 中断空闲线程
如果核心线程数量变多了, 新增Math.min(delta, workQueue.size())个工作线程
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
setKeepAliveTime 设置空闲线程超时时间
如果设置的keepAliveTime小于老的keepAliveTime, 中断空闲工作线程
/**
* 设置空闲线程超时时间,中断wokers
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
remove 移除任务
/**
* 移除任务
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
getPoolSize 获取当前工作线程数量
/**
* 工作线程数量
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
RejectedExecutionHandler 拒绝策略
/**
* 只用调用者所在线程来运行任务
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* 抛出异常RejectedExecutionException
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* 默默丢弃拒绝的任务
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* 丢弃队列最近的一个任务,并执行当前任务
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}