线程池
线程池是一组线程的集合。线程池维护一个队列,调用者向这个队列中添加任务,而线程池中的线程则不停地从队列中取出任务执行。
线程池的继承关系如下图,其中
ThreadPoolExecutor和
ScheduledThreadPoolExecutor是具体的实现。
ThreadPoolExecutor
基本的线程池,可以提交无返回值和有返回值的任务执行。重载的构造方法最长有七个参数,分比是
核心线程(常驻线程)个数,最大线程个数(当核心线程已满,任务队列也满,最大扩充线程的个数),非核心线程空闲的时间,时间单位,阻塞任务队列,线程生产工厂,饱和策略(用来处理核心线程已满,最大线程已满,任务队列满时的新到来任务的策略,也叫拒绝策略)。
其中,饱和策略共四种:
1.DiscardOldestPolicy,抛弃最早的任务,将新任务加入队列。
2.AbortPolicy,拒绝执行新任务,并抛出异常。
3.CallerRunsPolicy,交由调用者线程执行新任务,如果调用者线程已关闭,则抛弃任务。
4.DiscardPolicy,直接抛弃新任务。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
使用
一般在使用线程池时需要自行定制,一般来说普遍的理论是:
如果是CPU密集型任务:那么核心线程数 = CPU核心数 + 1;加1是为了防止突发情况导致某个线程不可执行,最大化利用CPU。
如果是IO密集型任务:那么核心线程数 = CPU核心数 * 2;
实际在应用过程中许多任务也分不出是IO密集还是CPU密集,上述理论也不一定是最好的,如果对性能追求极致,需要自行探索,也可以通过线程池的一些set方法动态修改核心线程数,空闲时间等参数。
下面是一个简单的例子:
package com.example.demo;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadPool {
private static final ThreadPoolExecutor executor;
//线程所属的线程组
private static final ThreadGroup group = new ThreadGroup("my-group");
//线程名字的前缀
private static final String prefix = "my-thread-";
//线程计数器
private static final AtomicInteger atomicInteger=new AtomicInteger(0);
static {
int core = Runtime.getRuntime().availableProcessors();
ThreadFactory factory= r -> new Thread(group, r, prefix+atomicInteger.incrementAndGet());
executor = new ThreadPoolExecutor(
core,
core*2,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(32),
factory,
//一共四种策略,这里采用调用者执行策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
//虚拟机退出时关闭线程池
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
executor.shutdown();
try {
boolean flag;
do {
//一直等待线程池关闭完成
flag = executor.awaitTermination(1,TimeUnit.SECONDS);
} while (!flag);
} catch (Exception e){
e.printStackTrace();
}
}
});
}
public static ThreadPoolExecutor getExecutor(){
return executor;
}
}
测试自定义的线程池
package com.example.demo;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestClass {
Random random = new Random();
@Test
public void test() throws InterruptedException {
//采用闭锁等待所有线程池线程执行完成
CountDownLatch latch=new CountDownLatch(50);
ThreadPoolExecutor executor = MyThreadPool.getExecutor();
for (int i=0;i<50;i++) {
final int j = i;
executor.execute(()->{
try {
System.out.println(String.format("我是线程:%s,执行第%d个任务",
Thread.currentThread().getName(),j));
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
}
}
实现原理
基本结构
部分源码如下,包含了主要的属性
public class ThreadPoolExecutor extends AbstractExecutorService {
//线程池状态控制变量,一个int包含两部分,低29位为workCount,高3位为runState,初始为RUNNING状态,workCount=0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//workCount最大2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池运行的状态,共5种
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列
private final BlockingQueue<Runnable> workQueue;
//访问变量用的互斥锁
private final ReentrantLock mainLock = new ReentrantLock();
//任务线程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//awaitTermination()方法的条件变量
private final Condition termination = mainLock.newCondition();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
execute方法
分3步进行:
1、当前工作线程数与corePoolSize比较,如果小于则尝试启动一个新线程,使用给定的参数command作为其第一个任务,否则执行2步骤
2、如果任务能够成功入队,则再次检查运行状态,如果不是运行状态则从队列中移除任务;如果不能入队则执行3步骤
3、如果不能将任务入队,则尝试添加一个新的非核心线程。如果增加失败了,则执行拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断线程个数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果是运行状态,则入队,任务队列没满则入队成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再检查状态是不是运行状态,不是则尝试从队列中移除任务,移除成功则执行饱和策略
if (! isRunning(recheck) && remove(command))
reject(command);
//启动一个线程,什么都不做
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//核心线程、任务队列都满了,增加非核心线程,如果也增加失败,则执行饱和策略
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);
// 运行状态>= SHUTDOWN,说明线程池进入关闭状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
//获取workCount
int wc = workerCountOf(c);
//线程超出个数,返回失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作,将workCount+1,成功则退出循环,否则重新读取控制变量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//CAS失败,如果状态不变继续进行内部for循环,如果变了则跳到外部for循环重新开始
if (runStateOf(c) != rs)
continue retry;
}
}
//标识任务已启动
boolean workerStarted = false;
//标识任务已新增
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程是活跃状态
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//更新最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果新增成功,则启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//线程没启动成功,当做增加失败处理,addWorkerFailed内部把workCount减1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
submit方法
submit方法定义在
AbstractExecutorService中:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
//封装成RunnableFuture执行,RunnableFuture实现了Runnable
RunnableFuture<T> ftask = newTaskFor(task);
//执行
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
Callable
在线程池中执行的任务有两种类型,一种是无参无返回值的,例如Runnable类型的,还有一种是无参有返回值的,即为Callable类型的。Callable接口如下定义:
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Worker
任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
//.......
/** 运行worker的线程. */
final Thread thread;
/** 接收的第一个任务 */
Runnable firstTask;
/** 执行完成的任务个数 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); //初始为RUNNING状态
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//.......
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//不停地从队列中取出任务
while (task != null || (task = getTask()) != null) {
w.lock();//执行任务之前先上锁,AQS实现
//检测线程池状态,如果状态>=STOP,则线程给自己发中断信号
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
//完成的任务数+1
w.completedTasks++;
w.unlock();
}
}
//可根据此变量判断Worker以何种方式退出(正常,中断,其他异常)
completedAbruptly = false;
} finally {
//清理worker
processWorkerExit(w, completedAbruptly);
}
}
钩子方法
ThreadPoolExecutor提供了下列的钩子方法,如果要继承ThreadPoolExecutor实现自己的线程池,可以考虑实现下列方法。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
关闭线程池
当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
状态转换过程
线程池有两个关闭函数,shutdown()和shutdownNow(),这两个函数会让线程池切换到不同的状态。但无论是哪个状态,在队列为空且线程池也为空之后,会切换为TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才算真正关闭。
状态迁移是从小到大,按照-1,0,1,2,3的顺序,只会正向迁移,不能颠倒。
两种shutdown的区别
shutdown()方法不会清空任务队列,会等所有任务执行完成,而shutdownNow()会清空任务队列。
shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子函数terminated()。当钩子函数执行完成时,把状态从TIDYING 改为TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。
部分源码如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭线程池的权限
checkShutdownAccess();
//CAS操作,将线程池状态迁移为SHUTDOWN
advanceRunState(SHUTDOWN);
//关闭空闲的线程
interruptIdleWorkers();
//给ScheduledThreadPoolExecutor留的钩子方法
onShutdown();
} finally {
mainLock.unlock();
}
//迁移到TERMINATED状态
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭线程池的权限
checkShutdownAccess();
//CAS操作,将线程池状态迁移为STOP
advanceRunState(STOP);
//关闭所有线程
interruptWorkers();
//移除任务队列的元素并返回它们
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//迁移到TERMINATED状态
tryTerminate();
return tasks;
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//线程未中断且Worker尝试加锁成功,说明线程处于空闲状态,如果加锁失败,说明线程当前持有锁,正在执行任务
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//中断所有线程
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
//任务队列为空,workCount=0会走到这里
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//迁移到TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//调用钩子函数
terminated();
} finally {
//迁移到TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
//通知等待的awaitTermination()方法
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 迁移状态失败则重新循环
}
}
推荐的关闭方式
executor.shutdown();
try {
boolean flag;
do {
//一直等待线程池关闭完成
flag = executor.awaitTermination(1,TimeUnit.SECONDS);
} while (!flag);
} catch (Exception e){
e.printStackTrace();
}
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,除了基本的线程池功能,还实现了
ScheduledExecutorService,可以延迟执行任务或按频率进行任务调度,具体体现在以下两方面:
1、延迟执行任务
2、周期执行任务
对应的方法如下:
//延迟执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);
//周期执行任务,按固定频率执行,与任务本身执行时间无关,但是任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,
TimeUnit unit);
//周期执行任务,按固定间隔执行,与任务本身执行时间有关。假如任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit);
使用
调度线程池的构建比较简单,重载的构造方法最多需要3个参数,分别是
核心线程数,线程工厂和饱和策略。下面简单演示了几种延迟调度的方式:
public class TestClass {
private static final ThreadGroup group = new ThreadGroup("schedule-group");
private static final String prefix = "schedule-thread-";
private static final AtomicInteger atomicInteger=new AtomicInteger(0);
private static final ThreadLocal<DateTimeFormatter> timeFormatters=new ThreadLocal();
private DateTimeFormatter getFormatter(){
DateTimeFormatter formatter = timeFormatters.get();
if (Objects.isNull(formatter)){
DateTimeFormatter formatter1=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
timeFormatters.set(formatter1);
return formatter1;
}
return formatter;
}
@Test
public void schedule(){
ThreadFactory factory= r -> new Thread(group, r ,prefix+atomicInteger.incrementAndGet());
int i = Runtime.getRuntime().availableProcessors();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(i, factory, new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("启动调度线程池:"+getFormatter().format(LocalDateTime.now()));
//延迟执行,执行一次
ScheduledFuture<String> delay = executor.schedule(
() -> "单次延迟执行开始时间:"+getFormatter().format(LocalDateTime.now()),
3, TimeUnit.SECONDS);
//固定频率执行
executor.scheduleAtFixedRate(() ->
System.out.println("固定频率开始执行:"+getFormatter().format(LocalDateTime.now())),
1,3, TimeUnit.SECONDS);
//固定延迟执行
executor.scheduleWithFixedDelay(()->
System.out.println("固定延迟开始执行:"+getFormatter().format(LocalDateTime.now())),
1,3,TimeUnit.SECONDS);
while (!delay.isDone()) {
try {
String s = delay.get();
System.out.println("单次延迟执行结果:"+s);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
break;
}
while (true){}
}
}
实现原理
构造方法调用了
ThreadPoolExecutor的构造方法,最大线程个数设置为了int的最大值,空闲时间也设为0,任务队列也固定为一个自定义延迟队列。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
基本结构
DelayedWorkQueue
ScheduledThreadPoolExecutor的任务队列使用了一个自定义的延迟队列
DelayedWorkQueue,
跟普通的延迟队列相差不大,这个静态内部类定义如下:
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
//RunnableScheduledFuture的唯一实现是ScheduledFutureTask
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
//队头等待的线程
private Thread leader = null;
//当队列头部有新的任务可用或新线程可能需要成为leader时发出的条件
private final Condition available = lock.newCondition();
//......
}
ScheduledFutureTask
ScheduledThreadPoolExecutor还有一个内部类
ScheduledFutureTask,实现如下:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//任务序号
private final long sequenceNumber;
//以纳秒为单位,启动任务的时间(周期任务启动时间一直会累加)
private long time;
//周期(以纳秒为单位)用于重复任务。正值表示固定速率执行。负值表示固定延迟执行。0表示非重复任务
private final long period;
//通过reExecutePeriodic()方法重新加入队列的实际任务
RunnableScheduledFuture<V> outerTask = this;
//当前任务在延迟队列中的索引,用于更快的取消任务
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public boolean isPeriodic() {
return period != 0;
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();//设置下一次启动的时间
reExecutePeriodic(outerTask);//重新入队
}
}
}
schedule方法(Runnable)
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//把Runnable包装成RunnableScheduledFuture,ScheduledFutureTask是其实现
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//线程池关闭则执行拒绝策略
if (isShutdown())
reject(task);
else {
//把任务放入延迟队列
super.getQueue().add(task);
//线程池关闭或者 当前线程池状态不能运行任务,且从队列中移除任务成功,则取消任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//当小于核心线程数只开启新线程(不运行任务)
if (wc < corePoolSize)
addWorker(null, true);
//即使corePoolSize=0,当第一次运行时,也启动一个新线程
else if (wc == 0)
addWorker(null, false);
}
scheduleWithFixedDelay和scheduleAtFixedRate方法
两个方法实现几乎一模一样,唯一的区别是ScheduledFutureTask的构造方法一个是正数,一个是负数,在
ScheduledFutureTask中,正值表示固定速率执行。负值表示固定延迟执行。0表示非重复任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));//负数
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));//正数
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
Future
Future的继承关系如下图所示
RunnableFuture
接口定义如下,继承了Runnable和Future两个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask
这个类是RunnableFuture的实现,内部存有一个Callable,作为一个适配器,将Callable转换为Runnable执行。
基本结构
public class FutureTask<V> implements RunnableFuture<V> {
//任务状态
private volatile int state;
//初始状态
private static final int NEW = 0;
//中间状态
private static final int COMPLETING = 1;
//以下都是最终状态
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
//中间状态
private static final int INTERRUPTING = 5;
//最终状态
private static final int INTERRUPTED = 6;
/** 实际要执行的Callable,通过构造函数传入 */
private Callable<V> callable;
/** 存放执行结果或者抛出的异常 */
private Object outcome;
/** 运行Callable的线程 */
private volatile Thread runner;
/** 在get方法等待的线程链表 */
private volatile WaitNode waiters;
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
run方法
public void run() {
//任务状态不是NEW 或者 其他线程在执行任务,结束方法
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//异常存入outcome变量
setException(ex);
}
if (ran)
//返回值存入outcome变量
set(result);
}
} finally {
//置空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)//处理中断
handlePossibleCancellationInterrupt(s);
}
}
//状态 NEW->COMPLETING->EXCEPTIONAL
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//通知等待的线程
finishCompletion();
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//通知等待的线程
finishCompletion();
}
}
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
//空实现
done();
callable = null;
}
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)//不是正在完成状态,无限期等待
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//等待指定时间
throw new TimeoutException();
return report(s);
}
//返回结果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//线程中断了则从链表中移除
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 正处于中间状态
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)//入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else//阻塞
LockSupport.park(this);
}
}
Executors
这个类是Executor框架的辅助工具类,用来创建预定义的线程池和一些任务(Callable)。在实际环境下,不建议直接使用此工具创建的线程池,特别是无界阻塞队列和不限制线程个数的实现,容易造成OOM,资源耗尽等严重问题。
//每来一个任务,就创建一个线程
ExecutorService executorService1 = Executors.newCachedThreadPool();
//固定线程数的线程池
ExecutorService executorService2 = Executors.newFixedThreadPool(6);
//多线程的调度线程池
ExecutorService executorService3 = Executors.newScheduledThreadPool(8);
//单线程线程池
ExecutorService executorService4 = Executors.newSingleThreadExecutor();
//单线程调度线程池
ExecutorService executorService5 = Executors.newSingleThreadScheduledExecutor();
//forkjoin线程池
ExecutorService executorService6 = Executors.newWorkStealingPool();