一、线程
1.线程
线程是调度CPU资源的最小单位。java线程与OS线程保持1:1映射关系,也就是说,一个Java线程也会在操作系统里有一个对应线程。
2.线程的生命周期
NEW,新建
RUNNABLE,运行
BLOCKED,阻塞
WAITING,等待
TIMED_WAITING,超时等待
TERMINATED,终结
二、线程池
1.为什么使用线程池
通过减少频繁创建和销毁线程来降低性能损耗。
例如:当并发请求量比较大的时候,单个任务的执行时间很短,频繁创建和销毁线程会大大降低系统的效率。
2.线程池的使用场景
1.单个任务执行时间比较短
2.任务数量比较多
3.Executor线程池
3-1 结构
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
package java.util.concurrent;
public interface Executor {
void execute(Runnable var1);
}
Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为:
1、excute(Runnable command) : 履行Runnable类型任务。
2、submit(task) :可以用来提交Callable或Runnable任务,并返回代表此任务的Future对象。
3、shutdown():在完成已提交的任务后封闭,不再接管新任务。
4、shutdownNow() : 停止所有正在履行的任务并封闭。
5、isTerminated() : 测试是否所有任务都履行完毕了。
6、isShutdown() : 测试是否该ExecutorService已被关闭。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
<T> Future<T> submit(Callable<T> var1);
<T> Future<T> submit(Runnable var1, T var2);
Future<?> submit(Runnable var1);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService实现了ExecutorService中的方法:
package java.util.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
public abstract class AbstractExecutorService implements ExecutorService {
public AbstractExecutorService() {
}
protected <T> RunnableFuture<T> newTaskFor(Runnable var1, T var2) {
return new FutureTask(var1, var2);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> var1) {
return new FutureTask(var1);
}
public Future<?> submit(Runnable var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var2 = this.newTaskFor(var1, (Object)null);
this.execute(var2);
return var2;
}
}
public <T> Future<T> submit(Runnable var1, T var2) {
if (var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var3 = this.newTaskFor(var1, var2);
this.execute(var3);
return var3;
}
}
public <T> Future<T> submit(Callable<T> var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
RunnableFuture var2 = this.newTaskFor(var1);
this.execute(var2);
return var2;
}
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> var1, boolean var2, long var3) throws InterruptedException, ExecutionException, TimeoutException {
if (var1 == null) {
throw new NullPointerException();
} else {
int var5 = var1.size();
if (var5 == 0) {
throw new IllegalArgumentException();
} else {
ArrayList var6 = new ArrayList(var5);
ExecutorCompletionService var7 = new ExecutorCompletionService(this);
boolean var23 = false;
Object var14;
try {
var23 = true;
ExecutionException var8 = null;
long var9 = var2 ? System.nanoTime() + var3 : 0L;
Iterator var11 = var1.iterator();
var6.add(var7.submit((Callable)var11.next()));
--var5;
int var12 = 1;
while(true) {
Future var13 = var7.poll();
if (var13 == null) {
if (var5 > 0) {
--var5;
var6.add(var7.submit((Callable)var11.next()));
++var12;
} else {
if (var12 == 0) {
if (var8 == null) {
var8 = new ExecutionException();
}
throw var8;
}
if (var2) {
var13 = var7.poll(var3, TimeUnit.NANOSECONDS);
if (var13 == null) {
throw new TimeoutException();
}
var3 = var9 - System.nanoTime();
} else {
var13 = var7.take();
}
}
}
if (var13 != null) {
--var12;
try {
var14 = var13.get();
var23 = false;
break;
} catch (ExecutionException var24) {
var8 = var24;
} catch (RuntimeException var25) {
var8 = new ExecutionException(var25);
}
}
}
} finally {
if (var23) {
int var18 = 0;
for(int var19 = var6.size(); var18 < var19; ++var18) {
((Future)var6.get(var18)).cancel(true);
}
}
}
int var15 = 0;
for(int var16 = var6.size(); var15 < var16; ++var15) {
((Future)var6.get(var15)).cancel(true);
}
return var14;
}
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedExcepti
3-2 简单使用
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 5, 5000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
for (int i=0;i<5;i++){
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("i m task :"+Thread.currentThread().getName());
}
},i);
}
}
阿里巴巴开发规范:
3-3 线程池属性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 新建ThreadPoolExecutor的时候会对ctl进行初始化,-536870912 | 0
// ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:线程池的运行状态(runState) 和线程池内有效线程的数量(workerCount),使用了Integer类型保存,高3位保存runState,低29位保存workerCount,大约是5亿。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(-536870912, 0)); // 1110 0000 0000 0000 0000 0000 0000 0000
private static final int COUNT_BITS = 29; // 0001 1101
private static final int CAPACITY = 536870911; // 0001 1111 1111 1111 1111 1111 1111 1111
// 线程池状态
// 线程池处在RUNNING状态时,能够接受新任务,以及对已添加的任务进行处理。
private static final int RUNNING = -536870912; // 1110 0000 0000 0000 0000 0000 0000 0000
// 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
private static final int SHUTDOWN = 0; //0000 0000 0000 0000 0000 0000 0000 0000
// 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
private static final int STOP = 536870912; // 0010 0000 0000 0000 0000 0000 0000 0000
// 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
private static final int TIDYING = 1073741824; // 0100 0000 0000 0000 0000 0000 0000 0000
// 线程池彻底终止
private static final int TERMINATED = 1610612736; // 0110 0000 0000 0000 0000 0000 0000 0000
}
3-4 源码解析
3-4-1 线程池的具体实现
ThreadPoolExecutor 默认线程池
ScheduledThreadPoolExecutor 定时线程池
3-4-2 线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解释:
1>corePoolSize:核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池提前创建并启动所有核心线程。
2>maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
3>keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
4>unit:keepAliveTime时间的单位;
5>workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
6>handler:线程池的饱和(拒绝)策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
3-4-3 ThreadPoolExecutor执行全流程
1) 当前运行的线程数 < corePoolSize,则创建新线程来执行任务;
2)当前运行的线程数 >= corePoolSize,则任务加入阻塞BlockingQueue;
3)阻塞队列BlockingQueue已满,则创建新的线程来处理任务;
4)如果创建新线程将使当前运行的线程 > 最大线程数,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
3-4-3-1 execute执行全流程
public void execute(Runnable var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
int var2 = this.ctl.get();
if (workerCountOf(var2) < this.corePoolSize) {
if (this.addWorker(var1, true)) {
return;
}
var2 = this.ctl.get();
}
if (isRunning(var2) && this.workQueue.offer(var1)) {
int var3 = this.ctl.get();
if (!isRunning(var3) && this.remove(var1)) {
this.reject(var1);
} else if (workerCountOf(var3) == 0) {
// 为了保证线程池在RUNNING状态下必须要有一个线程来执行任务
this.addWorker((Runnable)null, false);
}
} else if (!this.addWorker(var1, false)) {
this.reject(var1);
}
}
}
3-4-3-2 addWorker执行全流程
private boolean addWorker(Runnable var1, boolean var2) {
while(true) {
int var3 = this.ctl.get();
int var4 = runStateOf(var3);
if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {
return false;
}
while(true) {
int var5 = workerCountOf(var3);
if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) {
return false;
}
if (this.compareAndIncrementWorkerCount(var3)) {
boolean var18 = false;
boolean var19 = false;
ThreadPoolExecutor.Worker var20 = null;
try {
// 根据firstTask来创建Worker对象
var20 = new ThreadPoolExecutor.Worker(var1);
// 每一个Worker对象都会创建一个线程
Thread var6 = var20.thread;
if (var6 != null) {
ReentrantLock var7 = this.mainLock;
var7.lock();
try {
int var8 = runStateOf(this.ctl.get());
// rs < 0表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (var8 < 0 || var8 == 0 && var1 == null) {
if (var6.isAlive()) {
throw new IllegalThreadStateException();
}
this.workers.add(var20);
int var9 = this.workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (var9 > this.largestPoolSize) {
this.largestPoolSize = var9;
}
var19 = true;
}
} finally {
var7.unlock();
}
if (var19) {
// 启动线程
var6.start();
var18 = true;
}
}
} finally {
if (!var18) {
this.addWorkerFailed(var20);
}
}
return var18;
}
var3 = this.ctl.get();
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(var3) != var4) {
break;
}
}
}
}
3-4-3-3 runWorker执行全流程
// 使用AQS来实现独占锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread; // 用来处理任务的线程
Runnable firstTask; // 传入的任务
volatile long completedTasks;
Worker(Runnable var2) {
//tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断
this.setState(-1);
this.firstTask = var2;
// newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
}
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
protected boolean isHeldExclusively() {
return this.getState() != 0;
}
protected boolean tryAcquire(int var1) {
if (this.compareAndSetState(0, 1)) {
this.setExclusiveOwnerThread(Thread.currentThread());
return true;
} else {
return false;
}
}
protected boolean tryRelease(int var1) {
this.setExclusiveOwnerThread((Thread)null);
this.setState(0);
return true;
}
public void lock() {
this.acquire(1);
}
public boolean tryLock() {
return this.tryAcquire(1);
}
public void unlock() {
this.release(1);
}
public boolean isLocked() {
return this.isHeldExclusively();
}
void interruptIfStarted() {
Thread var1;
if (this.getState() >= 0 && (var1 = this.thread) != null && !var1.isInterrupted()) {
try {
var1.interrupt();
} catch (SecurityException var3) {
}
}
}
}
final void runWorker(ThreadPoolExecutor.Worker var1) {
Thread var2 = Thread.currentThread();
// 获取第一个任务
Runnable var3 = var1.firstTask;
var1.firstTask = null;
// 允许中断
var1.unlock();
boolean var4 = true;
try {
// // 如果task为空,则通过getTask来获取任务
while(var3 != null || (var3 = this.getTask()) != null) {
var1.lock();
if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) {
var2.interrupt();
}
try {
this.beforeExecute(var2, var3);
Object var5 = null;
try {
var3.run();
} catch (RuntimeException var28) {
var5 = var28;
throw var28;
} catch (Error var29) {
var5 = var29;
throw var29;
} catch (Throwable var30) {
var5 = var30;
throw new Error(var30);
} finally {
this.afterExecute(var3, (Throwable)var5);
}
} finally {
var3 = null;
++var1.completedTasks;
var1.unlock();
}
}
var4 = false;
} finally {
this.processWorkerExit(var1, var4);
}
}
private void processWorkerExit(ThreadPoolExecutor.Worker var1, boolean var2) {
if (var2) {
this.decrementWorkerCount();
}
ReentrantLock var3 = this.mainLock;
var3.lock();
try {
//统计完成的任务数
this.completedTaskCount += var1.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
this.workers.remove(var1);
} finally {
var3.unlock();
}
// 根据线程池状态进行判断是否结束线程池
this.tryTerminate();
int var4 = this.ctl.get();
// 536870912 -> STOP的取值,小于STOP的状态是运行状态或者SHUTDOWN
if (runStateLessThan(var4, 536870912)) {
// 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
if (!var2) {
// 如果允许核心线程超时,最小值为0,否则为corePoolSize
int var5 = this.allowCoreThreadTimeOut ? 0 : this.corePoolSize;
// 如果最小值为0,同时任务队列不空,则更新最小值为1
if (var5 == 0 && !this.workQueue.isEmpty()) {
var5 = 1;
}
// 工作线程数大于等于最小值,直接返回不新增非核心线程
if (workerCountOf(var4) >= var5) {
return;
}
}
this.addWorker((Runnable)null, false);
}
}
private Runnable getTask() {
boolean var1 = false;
while(true) {
int var2 = this.ctl.get();
int var3 = runStateOf(var2);
/** 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
1. rs >= STOP,线程池是否正在stop;
2. 阻塞队列是否为空。
如果以上条件满足,则将workerCount减1并返回null。
因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) {
this.decrementWorkerCount();
return null;
}
int var4 = workerCountOf(var2);
boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize;
if (var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) {
try {
// 根据var5来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null(其实就是,在keepAliveTime时间内能否获取到任务)
// 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空
Runnable var6 = var5 ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
if (var6 != null) {
return var6;
}
var1 = true;
} catch (InterruptedException var7) {
var1 = false;
}
} else if (this.compareAndDecrementWorkerCount(var2)) {
return null;
}
}
}
工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束。
线程池监控:
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量
3-5 定时线程池(ScheduledThreadPoolExecutor)
3-5-1 类结构图
3-5-2 简单使用
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
long cur = System.currentTimeMillis();
System.out.println("执行开始时间:" + cur);
// 延时2s执行任务
ScheduledFuture<?> future = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("Hello World");
return 1;
}, 2000, TimeUnit.MILLISECONDS);
System.out.println("执行完成时间:" + (System.currentTimeMillis() - cur));
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
但是,这个只能执行一次,如果想定时执行该怎么办呢?答:使用scheduleAtFixedRate方法,scheduleAtFixedRate(task,time,period,unit)
task-所要安排的任务,time-首次执行任务的时间,period-执行一次task的时间间隔,unit-单位
作用:时间等于或超过time首次执行task,之后每隔period后重复执行task
public class ThreadPool {
private static Integer count =1;
MyTimerTask myTimerTask = new MyTimerTask();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
public void start(){
try {
//从第一次任务开始时延时0s,一秒执行一次
scheduledThreadPoolExecutor.scheduleAtFixedRate(myTimerTask, 0,1, TimeUnit.SECONDS);
while (!scheduledThreadPoolExecutor.isTerminated()){
lock.readLock().lock();
if (count >20){
scheduledThreadPoolExecutor.shutdown();
}
lock.readLock().unlock();
}
}catch(Exception e){
e.printStackTrace();
}
System.out.println("Finished all threads");
}
private class MyTimerTask implements Runnable {
@Override
public void run(){
lock.writeLock().lock();
System.out.println("第 "+count+ " 次执行任务,count="+count);
count ++;
lock.writeLock().unlock();
}
}
public static void main(String[] args) {
new ThreadPool().start();
}
}
当任务的执行时间大于下一任务开始执行的时间,会出现什么情况呢?
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println(new Date() + ": 任务执行完成。。。");
},1000,2000,TimeUnit.MILLISECONDS);
}
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
long startTime = System.currentTimeMillis();
System.out.println(new Date(startTime) + ": 任务开始执行。。。");
long nowTime = startTime;
while ((nowTime - startTime) < 5000) {
nowTime = System.currentTimeMillis();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(new Date(nowTime) + ": 任务执行完成。。。");
},1000,2000,TimeUnit.MILLISECONDS);
}
当任务的执行时间大于下一任务开始执行时间,会导致后续任务进行积压,当前任务执行完成后再继续执行后面的任务。当任务数量过多时,积压这么多也不是办法,有什么方法可以保证上一任务执行完成后 延时指定的时间后 再继续执行下一任务呢? 答:scheduleWithFixedDelay方法,
scheduleWithFixedDelay(task,time,delay,unit)
task-所要安排的任务, time-首次执行任务的时间 ,delay-每次执行任务的延迟时间,unit-单位
作用:时间等于或超过time首次执行task,之后每隔delay后重复执行task
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
long startTime = System.currentTimeMillis();
System.out.println(new Date(startTime) + ": 任务开始执行。。。");
long nowTime = startTime;
while ((nowTime - startTime) < 5000) {
nowTime = System.currentTimeMillis();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(new Date(nowTime) + ": 任务执行完成。。。");
},1000,2000,TimeUnit.MILLISECONDS);
}
当执行任务抛出异常,系统会报错吗?答:不会,因为线程池中try…catcn了异常
public static void main(String[] args) {
System.out.println(111);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println(new Date() + ": 任务执行完成。。。");
throw new RuntimeException("error...");
},1000,2000,TimeUnit.MILLISECONDS);
System.out.println(222);
}
3-5-3 使用场景
3-5-3-1 分布式锁
3-5-3-2 SpringCloud-服务注册与发现中心
3-5-3 源码解析
它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三种提交任务
的方式:
1.schedule
2.scheduledAtFixedRate
3.scheduledWithFixedDelay
它采用DelayQueue存储等待的任务
1.DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若
time相同则根据sequenceNumber排序;
2.DelayQueue也是一个无界队列;
基本方法与ThreadPoolExecutor中的一样,接下来就介绍一下它独有的处理方式。
3-5-3-1 构造方法
public ScheduledThreadPoolExecutor(int var1) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}
public ScheduledThreadPoolExecutor(int var1, RejectedExecutionHandler var2) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}
public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2, RejectedExecutionHandler var3) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2, var3);
}
调ThreadPoolExecutor类中的构造方法,
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6) {
this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7) {
this(var1, var2, var3, var5, var6, var7, defaultHandler);
}
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, RejectedExecutionHandler var7) {
this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), var7);
}
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
if (var6 != null && var7 != null && var8 != null) {
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = var1;
this.maximumPoolSize = var2;
this.workQueue = var6;
this.keepAliveTime = var5.toNanos(var3);
this.threadFactory = var7;
this.handler = var8;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}
3-5-3-2 schedule/scheduleAtFixedRate/scheduleWithFixedDelay方法
public <V> ScheduledFuture<V> schedule(Callable<V> var1, long var2, TimeUnit var4) {
if (var1 != null && var4 != null) {
RunnableScheduledFuture var5 = this.decorateTask((Callable)var1, new ScheduledThreadPoolExecutor.ScheduledFutureTask(var1, this.triggerTime(var2, var4)));
this.delayedExecute(var5);
return var5;
} else {
throw new NullPointerException();
}
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6) {
if (var1 != null && var6 != null) {
if (var4 <= 0L) {
throw new IllegalArgumentException();
} else {
ScheduledThreadPoolExecutor.ScheduledFutureTask var7 = new ScheduledThreadPoolExecutor.ScheduledFutureTask(var1, (Object)null, this.triggerTime(var2, var6), var6.toNanos(var4));
RunnableScheduledFuture var8 = this.decorateTask((Runnable)var1, var7);
var7.outerTask = var8;
this.delayedExecute(var8);
return var8;
}
} else {
throw new NullPointerException();
}
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable var1, long var2, long var4, TimeUnit var6) {
if (var1 != null && var6 != null) {
if (var4 <= 0L) {
throw new IllegalArgumentException();
} else {
ScheduledThreadPoolExecutor.ScheduledFutureTask var7 = new ScheduledThreadPoolExecutor.ScheduledFutureTask(var1, (Object)null, this.triggerTime(var2, var6), var6.toNanos(-var4));
RunnableScheduledFuture var8 = this.decorateTask((Runnable)var1, var7);
var7.outerTask = var8;
this.delayedExecute(var8);
return var8;
}
} else {
throw new NullPointerException();
}
}
private void delayedExecute(RunnableScheduledFuture<?> var1) {
//如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
if (this.isShutdown()) {
this.reject(var1);
} else {
//与ThreadPoolExecutor不同,这里直接把任务加入延迟队列(使用的是DelayedWorkQueue)
super.getQueue().add(var1);
//如果当前状态无法执行任务,则取消
if (this.isShutdown() && !this.canRunInCurrentRunState(var1.isPeriodic()) && this.remove(var1)) {
var1.cancel(false);
} else {
//这里是增加一个worker线程,避免提交的任务没有worker去执行。原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
this.ensurePrestart();
}
}
}
// 由用户自己来实现特殊逻辑
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> var1, RunnableScheduledFuture<V> var2) {
return var2;
}
void ensurePrestart() {
int var1 = workerCountOf(this.ctl.get());
if (var1 < this.corePoolSize) {
this.addWorker((Runnable)null, true);
} else if (var1 == 0) {
this.addWorker((Runnable)null, false);
}
}
3-5-3-3 ScheduledFutureTask
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber; // 任务的序号
private long time; // 任务开始的时间
private final long period; // 任务执行的时间间隔
RunnableScheduledFuture<V> outerTask = this;
int heapIndex;
ScheduledFutureTask(Runnable var2, V var3, long var4) {
super(var2, var3);
this.time = var4;
this.period = 0L;
this.sequenceNumber = ScheduledThreadPoolExecutor.sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable var2, V var3, long var4, long var6) {
super(var2, var3);
this.time = var4;
this.period = var6;
this.sequenceNumber = ScheduledThreadPoolExecutor.sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> var2, long var3) {
super(var2);
this.time = var3;
this.period = 0L;
this.sequenceNumber = ScheduledThreadPoolExecutor.sequencer.getAndIncrement();
}
public long getDelay(TimeUnit var1) {
return var1.convert(this.time - ScheduledThreadPoolExecutor.this.now(), TimeUnit.NANOSECONDS);
}
// ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序
public int compareTo(Delayed var1) {
if (var1 == this) {
return 0;
} else if (var1 instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
ScheduledThreadPoolExecutor.ScheduledFutureTask var5 = (ScheduledThreadPoolExecutor.ScheduledFutureTask)var1;
long var3 = this.time - var5.time;
if (var3 < 0L) {
return -1;
} else if (var3 > 0L) {
return 1;
} else {
return this.sequenceNumber < var5.sequenceNumber ? -1 : 1;
}
} else {
long var2 = this.getDelay(TimeUnit.NANOSECONDS) - var1.getDelay(TimeUnit.NANOSECONDS);
return var2 < 0L ? -1 : (var2 > 0L ? 1 : 0);
}
}
public boolean isPeriodic() {
return this.period != 0L;
}
private void setNextRunTime() {
long var1 = this.period;
if (var1 > 0L) {
this.time += var1;
} else {
this.time = ScheduledThreadPoolExecutor.this.triggerTime(-var1);
}
}
public boolean cancel(boolean var1) {
boolean var2 = super.cancel(var1);
if (var2 && ScheduledThreadPoolExecutor.this.removeOnCancel && this.heapIndex >= 0) {
ScheduledThreadPoolExecutor.this.remove(this);
}
return var2;
}
public void run() {
boolean var1 = this.isPeriodic();
//如果当前线程池已经不支持执行任务,则取消
if (!ScheduledThreadPoolExecutor.this.canRunInCurrentRunState(var1)) {
this.cancel(false);
} else if (!var1) { //如果不需要周期性执行,则直接执行run方法然后结束
ScheduledFutureTask.super.run();
} else if (ScheduledFutureTask.super.runAndReset()) { //如果需要周期执行,则在执行完任务以后,设置下一次执行时间
// 计算下次执行该任务的时间
this.setNextRunTime();
//重复执行任务
ScheduledThreadPoolExecutor.this.reExecutePeriodic(this.outerTask);
}
}
}
void reExecutePeriodic(RunnableScheduledFuture<?> var1) {
if (this.canRunInCurrentRunState(true)) {
super.getQueue().add(var1);
if (!this.canRunInCurrentRunState(true) && this.remove(var1)) {
var1.cancel(false);
} else {
this.ensurePrestart();
}
}
}
3-5-3-4 DelayedWorkQueue
为什么要使用DelayedWorkQueue呢?
答:定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当
前队列中执行时间最靠前的,所以自然要使用优先级队列。
DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中
执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时
间复杂度是 O(logN)。
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务 是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture[16];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
private Thread leader = null;
private final Condition available;
DelayedWorkQueue() {
this.available = this.lock.newCondition();
}
private void setIndex(RunnableScheduledFuture<?> var1, int var2) {
if (var1 instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
((ScheduledThreadPoolExecutor.ScheduledFutureTask)var1).heapIndex = var2;
}
}
private void siftUp(int var1, RunnableScheduledFuture<?> var2) {
while(true) {
if (var1 > 0) {
int var3 = var1 - 1 >>> 1;
RunnableScheduledFuture var4 = this.queue[var3];
if (var2.compareTo(var4) < 0) {
this.queue[var1] = var4;
this.setIndex(var4, var1);
var1 = var3;
continue;
}
}
this.queue[var1] = var2;
this.setIndex(var2, var1);
return;
}
}
private void siftDown(int var1, RunnableScheduledFuture<?> var2) {
int var4;
for(int var3 = this.size >>> 1; var1 < var3; var1 = var4) {
var4 = (var1 << 1) + 1;
RunnableScheduledFuture var5 = this.queue[var4];
int var6 = var4 + 1;
if (var6 < this.size && var5.compareTo(this.queue[var6]) > 0) {
var4 = var6;
var5 = this.queue[var6];
}
if (var2.compareTo(var5) <= 0) {
break;
}
this.queue[var1] = var5;
this.setIndex(var5, var1);
}
this.queue[var1] = var2;
this.setIndex(var2, var1);
}
private void grow() {
int var1 = this.queue.length;
int var2 = var1 + (var1 >> 1);
if (var2 < 0) {
var2 = 2147483647;
}
this.queue = (RunnableScheduledFuture[])Arrays.copyOf(this.queue, var2);
}
private int indexOf(Object var1) {
if (var1 != null) {
int var2;
if (var1 instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
var2 = ((ScheduledThreadPoolExecutor.ScheduledFutureTask)var1).heapIndex;
if (var2 >= 0 && var2 < this.size && this.queue[var2] == var1) {
return var2;
}
} else {
for(var2 = 0; var2 < this.size; ++var2) {
if (var1.equals(this.queue[var2])) {
return var2;
}
}
}
}
return -1;
}
public boolean contains(Object var1) {
ReentrantLock var2 = this.lock;
var2.lock();
boolean var3;
try {
var3 = this.indexOf(var1) != -1;
} finally {
var2.unlock();
}
return var3;
}
public boolean remove(Object var1) {
ReentrantLock var2 = this.lock;
var2.lock();
boolean var4;
try {
int var3 = this.indexOf(var1);
if (var3 >= 0) {
this.setIndex(this.queue[var3], -1);
int var10 = --this.size;
RunnableScheduledFuture var5 = this.queue[var10];
this.queue[var10] = null;
if (var10 != var3) {
this.siftDown(var3, var5);
if (this.queue[var3] == var5) {
this.siftUp(var3, var5);
}
}
boolean var6 = true;
return var6;
}
var4 = false;
} finally {
var2.unlock();
}
return var4;
}
public int size() {
ReentrantLock var1 = this.lock;
var1.lock();
int var2;
try {
var2 = this.size;
} finally {
var1.unlock();
}
return var2;
}
public boolean isEmpty() {
return this.size() == 0;
}
public int remainingCapacity() {
return 2147483647;
}
public RunnableScheduledFuture<?> peek() {
ReentrantLock var1 = this.lock;
var1.lock();
RunnableScheduledFuture var2;
try {
var2 = this.queue[0];
} finally {
var1.unlock();
}
return var2;
}
public boolean offer(Runnable var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
RunnableScheduledFuture var2 = (RunnableScheduledFuture)var1;
ReentrantLock var3 = this.lock;
var3.lock();
try {
int var4 = this.size;
if (var4 >= this.queue.length) {
this.grow();
}
this.size = var4 + 1;
if (var4 == 0) {
this.queue[0] = var2;
this.setIndex(var2, 0);
} else {
this.siftUp(var4, var2);
}
if (this.queue[0] == var2) {
this.leader = null;
this.available.signal();
}
} finally {
var3.unlock();
}
return true;
}
}
public void put(Runnable var1) {
this.offer(var1);
}
public boolean add(Runnable var1) {
return this.offer(var1);
}
public boolean offer(Runnable var1, long var2, TimeUnit var4) {
return this.offer(var1);
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> var1) {
int var2 = --this.size;
RunnableScheduledFuture var3 = this.queue[var2];
this.queue[var2] = null;
if (var2 != 0) {
this.siftDown(0, var3);
}
this.setIndex(var1, -1);
return var1;
}
public RunnableScheduledFuture<?> poll() {
ReentrantLock var1 = this.lock;
var1.lock();
RunnableScheduledFuture var3;
try {
RunnableScheduledFuture var2 = this.queue[0];
if (var2 != null && var2.getDelay(TimeUnit.NANOSECONDS) <= 0L) {
var3 = this.finishPoll(var2);
return var3;
}
var3 = null;
} finally {
var1.unlock();
}
return var3;
}
public RunnableScheduledFuture<?> take() throws InterruptedException {
ReentrantLock var1 = this.lock;
var1.lockInterruptibly();
try {
while(true) {
while(true) {
RunnableScheduledFuture var2 = this.queue[0];
if (var2 != null) {
long var3 = var2.getDelay(TimeUnit.NANOSECONDS);
if (var3 <= 0L) {
RunnableScheduledFuture var14 = this.finishPoll(var2);
return var14;
}
var2 = null;
if (this.leader != null) {
this.available.await();
} else {
Thread var5 = Thread.currentThread();
this.leader = var5;
try {
this.available.awaitNanos(var3);
} finally {
if (this.leader == var5) {
this.leader = null;
}
}
}
} else {
this.available.await();
}
}
}
} finally {
if (this.leader == null && this.queue[0] != null) {
this.available.signal();
}
var1.unlock();
}
}
public RunnableScheduledFuture<?> poll(long var1, TimeUnit var3) throws InterruptedException {
long var4 = var3.toNanos(var1);
ReentrantLock var6 = this.lock;
var6.lockInterruptibly();
try {
while(true) {
RunnableScheduledFuture var7 = this.queue[0];
if (var7 != null) {
long var22 = var7.getDelay(TimeUnit.NANOSECONDS);
if (var22 <= 0L) {
RunnableScheduledFuture var21 = this.finishPoll(var7);
return var21;
}
Thread var10;
if (var4 <= 0L) {
var10 = null;
return var10;
}
var7 = null;
if (var4 >= var22 && this.leader == null) {
var10 = Thread.currentThread();
this.leader = var10;
try {
long var11 = this.available.awaitNanos(var22);
var4 -= var22 - var11;
} finally {
if (this.leader == var10) {
this.leader = null;
}
}
} else {
var4 = this.available.awaitNanos(var4);
}
} else {
if (var4 <= 0L) {
Object var8 = null;
return (RunnableScheduledFuture)var8;
}
var4 = this.available.awaitNanos(var4);
}
}
} finally {
if (this.leader == null && this.queue[0] != null) {
this.available.signal();
}
var6.unlock();
}
}
public void clear() {
ReentrantLock var1 = this.lock;
var1.lock();
try {
for(int var2 = 0; var2 < this.size; ++var2) {
RunnableScheduledFuture var3 = this.queue[var2];
if (var3 != null) {
this.queue[var2] = null;
this.setIndex(var3, -1);
}
}
this.size = 0;
} finally {
var1.unlock();
}
}
private RunnableScheduledFuture<?> peekExpired() {
RunnableScheduledFuture var1 = this.queue[0];
return var1 != null && var1.getDelay(TimeUnit.NANOSECONDS) <= 0L ? var1 : null;
}
public int drainTo(Collection<? super Runnable> var1) {
if (var1 == null) {
throw new NullPointerException();
} else if (var1 == this) {
throw new IllegalArgumentException();
} else {
ReentrantLock var2 = this.lock;
var2.lock();
try {
RunnableScheduledFuture var3;
int var4;
for(var4 = 0; (var3 = this.peekExpired()) != null; ++var4) {
var1.add(var3);
this.finishPoll(var3);
}
int var5 = var4;
return var5;
} finally {
var2.unlock();
}
}
}
public int drainTo(Collection<? super Runnable> var1, int var2) {
if (var1 == null) {
throw new NullPointerException();
} else if (var1 == this) {
throw new IllegalArgumentException();
} else if (var2 <= 0) {
return 0;
} else {
ReentrantLock var3 = this.lock;
var3.lock();
try {
RunnableScheduledFuture var4;
int var5;
for(var5 = 0; var5 < var2 && (var4 = this.peekExpired()) != null; ++var5) {
var1.add(var4);
this.finishPoll(var4);
}
int var6 = var5;
return var6;
} finally {
var3.unlock();
}
}
}
public Object[] toArray() {
ReentrantLock var1 = this.lock;
var1.lock();
Object[] var2;
try {
var2 = Arrays.copyOf(this.queue, this.size, Object[].class);
} finally {
var1.unlock();
}
return var2;
}
public <T> T[] toArray(T[] var1) {
ReentrantLock var2 = this.lock;
var2.lock();
Object[] var3;
try {
if (var1.length < this.size) {
var3 = (Object[])Arrays.copyOf(this.queue, this.size, var1.getClass());
return var3;
}
System.arraycopy(this.queue, 0, var1, 0, this.size);
if (var1.length > this.size) {
var1[this.size] = null;
}
var3 = var1;
} finally {
var2.unlock();
}
return var3;
}
public Iterator<Runnable> iterator() {
return new ScheduledThreadPoolExecutor.DelayedWorkQueue.Itr((RunnableScheduledFuture[])Arrays.copyOf(this.queue, this.size));
}
private class Itr implements Iterator<Runnable> {
final RunnableScheduledFuture<?>[] array;
int cursor = 0;
int lastRet = -1;
Itr(RunnableScheduledFuture<?>[] var2) {
this.array = var2;
}
public boolean hasNext() {
return this.cursor < this.array.length;
}
public Runnable next() {
if (this.cursor >= this.array.length) {
throw new NoSuchElementException();
} else {
this.lastRet = this.cursor;
return this.array[this.cursor++];
}
}
public void remove() {
if (this.lastRet < 0) {
throw new IllegalStateException();
} else {
DelayedWorkQueue.this.remove(this.array[this.lastRet]);
this.lastRet = -1;
}
}
}
}