线程池有几个重要的属性,核心线程数,最大线程数,阻塞任务队列。
一、调度流程
1. 接收新的任务后,先判断核心线程数是否已满,未满则创建新的线程去执行任务
2. 如果核心线程数已满,再判断阻塞任务队列是否已满,未满则将新任务添加至阻塞队列
3. 如果阻塞任务队列已满,再判断最大线程数是否已满,未满则创建线程去执行任务
4. 如果最大线程数已满,则执行拒绝策略
注意:如果核心线程数已满,但阻塞任务队列未满时,不会创建线程去执行任务,会一直等待,所以如果这 3 个参数设置的不合理可能导致阻塞队列一直添加元素至资源耗尽。
二、调度器的钩子方法
三个钩子方法存在于ThreadPoolExecutor类,这3个方法都是空方法,一般会在子类中重写
protected void beforeExecute(Thread t, Runnable r)
- 每个任务执行前都会调度
protected void afterExecute(Runnable r, Throwable t)
- 每个任务执行后都会调度
protected void terminated()
- 调度器完全终止时调度
@Slf4j
public class ExecutorsTest {
static class TargetTask implements Runnable {
public static final int SLEEP_GAP = 1000;
// AtomicInteger 提供了一种线程安全的方式来对整数进行操作。它可以保证多个线程同时对同一个AtomicInteger对象进行操作时,不会出现数据竞争和不一致的情况
static AtomicInteger taskNo = new AtomicInteger(0);
String taskName;
@Override
public void run() {
log.info(Thread.currentThread().getName()+": "+taskName + "is running ...");
try {
Thread.sleep(SLEEP_GAP);
} catch (Exception e) {
log.error(Thread.currentThread().getName()+": "+taskName + "running error !");
e.printStackTrace();
}
log.info(Thread.currentThread().getName()+": "+ taskName + "is end ...");
}
TargetTask() {
taskName = "task_" + taskNo;
taskNo.incrementAndGet();
}
}
static class TargetTaskWithError extends TargetTask {
public void run() {
super.run();//执行父类的run方法
// 强行抛出异常
throw new RuntimeException("Error from " + taskName);
}
}
static class MyThreadFactory implements ThreadFactory {
static AtomicInteger threadNo = new AtomicInteger(1);
@Override
public Thread newThread(@NotNull Runnable r) {
String threadName = "MyThreadFactory_" + threadNo;
log.info("使用线程工厂创建一个线程 名字:" + threadName);
threadNo.incrementAndGet();
return new Thread(r, threadName);
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool9 = new ThreadPoolExecutor(1,1,0,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.info(" 任务执行前打印 ");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
log.info(" 任务执行打后印 ");
}
@Override
protected void terminated() {
super.terminated();
log.info(" 调度器终止时执行 terminate ");
}
};
for (int i = 0; i < 2; i++) {
pool9.submit( new TargetTask());
}
pool9.shutdown();
}
}
三、线程池的拒绝策略
- AbortPolicy 直接拒绝策略
默认拒绝策略,提交的新任务直接被拒绝,并且会抛出异常 RejectedExecutionException
- DiscardPolicy 抛弃策略
提交的任务直接被拒绝丢掉,不会抛出异常
- DiscardOldestPolicy 抛弃最老任务策略
将阻塞队列中最旧的任务丢掉(一般是队列头的元素),再尝试将新任务添加到队列中
- CallerRunsPolicy:调用者执行策略
新任务提交到线程池时发现已经满了,那么提交任务的线程会去执行该任务,不使用线程池的线程(例如:主线程)
- 自定义策略
实现 RejectedExecutionHandler 接口,重写其 rejectedExecution() 方法