这里还是续写上一章博客
线程池与Future:
线程池的实现原理:
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务,线程池中有一组线程,不断地从队列中取 任务,这是一个典型的生产者—消费者模型
要实现这样一个线程池,有⼏个问题需要考虑:
1:队列设置多长?,如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽,如果是有界的,当队 列满了之后,调用方如何处理?
2:线程池中的线程个数是固定的,还是动态变化的?
3:每次提交新任务,是放入队列?还是开新线程?
4:当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
这里给出一个案例:
package main6 ;
import java. util. concurrent. Callable ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class ThreadPoolTest {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
ExecutorService executorService = Executors . newFixedThreadPool ( 10 ) ;
Future < ? > submit = executorService. submit ( new a ( ) ) ;
System . out. println ( submit+ "66" ) ;
Object o = submit. get ( ) ;
System . out. println ( o+ "11" ) ;
Future submit1 = executorService. submit ( new aa ( ) ) ;
System . out. println ( submit1+ "77" ) ;
Object oo = submit1. get ( ) ;
System . out. println ( oo+ "33" ) ;
executorService. shutdown ( ) ;
new a ( ) . start ( ) ;
}
public static class a extends Thread {
@Override
public void run ( ) {
System . out. println ( currentThread ( ) ) ;
System . out. println ( 1 ) ;
}
}
public static class aa implements Callable {
@Override
public Object call ( ) throws Exception {
int sum = 0 ;
for ( int i = 1 ; i <= 10000 ; i++ ) {
sum += i;
}
System . out. println ( "计算的累加和是:" + sum) ;
return sum;
}
}
}
针对问题4,有3种做法:
1:不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制,当队列为空时,线程池中的线程 只能睡眠一会⼉,然后醒来去看队列中有没有新任务到来,如此不断轮询
2:不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制(比如当有任务时提醒你去执行),只是没有阻塞队列而已,只是在外面进行总体的阻塞,但他们还是执行的,相当于手动的阻塞调用线程或者阻塞处理线程,比较麻烦,因为要手动
3:使用阻塞队列,没有就阻塞,有就释放
很显然,做法3最完善,既避免了做法2中线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来 的资源消耗和延迟,正因为如此,所以接下来要讲的ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻 塞队列来实现的,而不是一般的队列,⾄此,各式各样的阻塞队列就要派上用场了
一般来说,队列任务是有顺序的给的,只是线程来抢而已,而这样就会考虑是否加上锁(或者CAS)
线程池的类继承体系:
线程池的类继承体系如下图所示:
还是一样的,实线白色箭头是继承,虚线白色箭头是实现,注意,可能并不是直接的继承或者实现,比如:
public class ThreadPoolExecutor extends AbstractExecutorService {
public abstract class AbstractExecutorService implements ExecutorService {
那么ThreadPoolExecutor间接的实现ExecutorService,这里要注意(可能是版本不同出现不同的代码,即修改了)
在这里,有两个核⼼的类: ThreadPoolExector 和 ScheduledThreadPoolExecutor ,后者不仅是可以执行某 个任务,最重要的是还可以周期性地执行任务
向线程池中提交的每个任务,都必须实现 Runnable 接口,通过最上面的 Executor 接口中的execute(Runnable command) 向线程池提交任务
然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值的任务, 也就是 Callable ,后面会详细介绍(前面案例也给出了具体实现)
ThreadPoolExecutor:
核⼼数据结构:
基于线程池的实现原理,下面看一下ThreadPoolExector的核⼼数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger ( ctlOf ( RUNNING , 0 ) ) ;
private final BlockingQueue < Runnable > workQueue;
private final ReentrantLock mainLock = new ReentrantLock ( ) ;
private final HashSet < Worker > workers = new HashSet < Worker > ( ) ;
}
每一个线程是一个Worker对象,Worker是ThreadPoolExector的内部类,核⼼数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
}
由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁(既然是AQS,那么通常是利用CAS来实现的锁),这把锁有什么用处呢?用于线程池的 关闭、线程执行任务的过程(中的操作)
核⼼配置参数解释:
ThreadPoolExecutor在其构造方法中提供了⼏个核⼼配置参数,来配置不同策略的线程池
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if ( corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0 )
throw new IllegalArgumentException ( ) ;
if ( workQueue == null || threadFactory == null || handler == null )
throw new NullPointerException ( ) ;
this . corePoolSize = corePoolSize;
this . maximumPoolSize = maximumPoolSize;
this . workQueue = workQueue;
this . keepAliveTime = unit. toNanos ( keepAliveTime) ;
this . threadFactory = threadFactory;
this . handler = handler;
}
上面的各个参数,解释如下:
1:corePoolSize:在线程池中始终维护的线程个数
2:maximumPoolSize:在corePooSize已满、队列也满的情况下,扩充线程⾄此值(但是始终维护的还是corePoolSize,只是线程数量扩充而已,虽然他会执行,但是在没有执行后超过的会进行回收,当然,这个回收一般是超过的,比如以corePooSize为5为例,有1,2,3,4,5,6,这个6是超过的,那么可以回收,无论后面加上7还是8,前面的1,2,3,4,5不会变,通常是这样,当然,如果是从头加入,那么就是6,1,2,3,4,即会变的,一般是尾部加入),一般线程数量不超过他,实际上在服务器中,为了满足用户的数量,通常会设置当前cpu处理的最大数量(具体可以百度)
3:keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize,而TimeUnit代表时间单位,比如TimeUnit.SECONDS就代表秒(前面也操作过这个构造方法)
4:blockingQueue:线程池所用的队列类型
5:threadFactory:线程创建工⼚,可以自定义,有默认值 Executors.defaultThreadFactory()
6:RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略
下面来看这6个配置参数在任务的提交过程中是怎么运作的,在每次往线程池中提交任务的时候,有如下的处 理流程:
步骤一:判断当前线程数是否大于或等于corePoolSize,如果小于,则新建线程执行,如果大于等于(一般是等于,大于是防止出现问题),则进入步骤 二
步骤二:判断队列是否已满,如未满,则放入,如已满,则进入步骤三
步骤三:判断当前线程数是否大于或等于maximumPoolSize,如果小于,则继续新建线程执行,如果大于或者等于(一般是等于,大于是防止出现问题),则进入步骤 四
步骤四:根据拒绝策略,开始拒绝任务(因为到这里,那么步骤二基本是满足,且线程也到顶了,那么我是忙不过来了,还是拒绝任务吧)
总结一下:⾸先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize(maximumPoolSize的简称),最后使用拒 绝策略
很显然,基于这种流程,如果队列是无界的,将永远没有机会⾛到步骤三(因为步骤二中永远不会都满足条件),也即maximumPoolSize没有使用,也一 定不会⾛到步骤四
线程池的优雅关闭:
线程池的关闭,较之线程的关闭更加复杂,当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调 用者正在向线程池提交任务,并且队列中可能还有未执行的任务,因此,关闭过程不可能是瞬时的,而是需要一个 平滑的过渡(即优雅的关闭),这就涉及线程池的完整⽣命周期管理
1:线程池的⽣命周期
在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面, 即ctl变量(也就是前面说明的private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));),如下图所示,最高的3位存储线程池状态,其余29位存储线程个数,而在JDK 6中,这两个变量是分开存 储的
由上面的代码可以看到,ctl变量被拆成两半,最高的3位用来表示线程池的状态(最终的是COUNT_MASK的结果),低的29位表示线程的个数, 线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED
下面分析状态之间的迁移过程,如图所示:
线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态,在队列 为空,线程池也为空之后,进入TIDYING 状态,最后执行一个钩子方法terminated(),进入TERMINATED状态,线 程池才真正关闭
这里的状态迁移有一个⾮常关键的特征:从小到大迁移,-1,0,1(0可以到1,也可以直接到2,看图就知道了),2,3,只会从小的状态值往大的状态值迁 移,不会逆向迁移,例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态
除 terminated()之外,线程池还提供了其他⼏个钩子方法,这些方法的实现基本都是空的,如果想实现自己的线程 池,可以重写这⼏个方法:
protected void beforeExecute ( Thread t, Runnable r) { }
protected void afterExecute ( Runnable r, Throwable t) { }
protected void terminated ( ) { }
2:正确关闭线程池的步骤(上面只是说明状态,而不是关闭操作)
关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,我们也可以调 用 awaitTermination() 来等待或者判断线程池是否完全关闭,那么可以任务关闭线程池的正确步骤如下:
executor. shutdown ( ) ;
try {
boolean flag = true ;
do {
flag = ! executor. awaitTermination ( 500 , TimeUnit . MILLISECONDS ) ;
} while ( flag) ;
} catch ( InterruptedException e) {
}
awaitTermination(…)方法的内部实现很简单,如下所示,不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回,如果不是,则通过termination条件变量阻塞一段时间,之后继续判断(因为始终的运行是不好的)
public boolean awaitTermination ( long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit. toNanos ( timeout) ;
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
while ( runStateLessThan ( ctl. get ( ) , TERMINATED ) ) {
if ( nanos <= 0L )
return false ;
nanos = termination. awaitNanos ( nanos) ;
}
return true ;
} finally {
mainLock. unlock ( ) ;
}
}
3:shutdown()与shutdownNow()的区别
shutdown()不会清空任务队列,会等所有任务执行完成(既然提交了任务,我就一定要完成,这里一般在对应中断操作里进行),shutdownNow()清空任务队列(直接清空任务),当然他会先中断线程,然后清空,保证(有)任务没有被操作
shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程,中断自然也会使得任务认为执行完了
public void shutdown ( ) {
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
checkShutdownAccess ( ) ;
advanceRunState ( SHUTDOWN ) ;
interruptIdleWorkers ( ) ;
onShutdown ( ) ;
} finally {
mainLock. unlock ( ) ;
}
tryTerminate ( ) ;
}
public List < Runnable > shutdownNow ( ) {
List < Runnable > tasks;
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
checkShutdownAccess ( ) ;
advanceRunState ( STOP ) ;
interruptWorkers ( ) ;
tasks = drainQueue ( ) ;
} finally {
mainLock. unlock ( ) ;
}
tryTerminate ( ) ;
return tasks;
}
下面看一下在上面的代码里中断空闲线程和中断所有线程的区别
shutdown()方法中的interruptIdleWorkers()方法的实现:
private void interruptIdleWorkers ( ) {
interruptIdleWorkers ( false ) ;
}
private void interruptIdleWorkers ( boolean onlyOne) {
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
for ( Worker w : workers) {
Thread t = w. thread;
if ( ! t. isInterrupted ( ) && w. tryLock ( ) ) {
try {
t. interrupt ( ) ;
} catch ( SecurityException ignore) {
} finally {
w. unlock ( ) ;
}
}
if ( onlyOne)
break ;
}
} finally {
mainLock. unlock ( ) ;
}
}
关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线 程是否处于空闲状态,tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号(true),否则不发送
public boolean tryLock ( ) { return tryAcquire ( 1 ) ; }
protected boolean tryAcquire ( int unused) {
if ( compareAndSetState ( 0 , 1 ) ) {
setExclusiveOwnerThread ( Thread . currentThread ( ) ) ;
return true ;
}
return false ;
}
shutdownNow()调用了 interruptWorkers()方法:
private void interruptWorkers ( ) {
for ( Worker w : workers)
w. interruptIfStarted ( ) ;
}
void interruptIfStarted ( ) {
Thread t;
if ( getState ( ) >= 0 && ( t = thread) != null && ! t. isInterrupted ( ) ) {
try {
t. interrupt ( ) ;
} catch ( SecurityException ignore) {
}
}
}
在上面的代码中,shutdown() 和shutdownNow()最终都调用了tryTerminate()方法,如下所示:
final void tryTerminate ( ) {
for ( ; ; ) {
int c = ctl. get ( ) ;
if ( isRunning ( c) ||
runStateAtLeast ( c, TIDYING ) ||
( runStateLessThan ( c, STOP ) && ! workQueue. isEmpty ( ) ) )
return ;
if ( workerCountOf ( c) != 0 ) {
interruptIdleWorkers ( ONLY_ONE ) ;
return ;
}
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
if ( ctl. compareAndSet ( c, ctlOf ( TIDYING , 0 ) ) ) {
try {
terminated ( ) ;
} finally {
ctl. set ( ctlOf ( TERMINATED , 0 ) ) ;
termination. signalAll ( ) ;
}
return ;
}
} finally {
mainLock. unlock ( ) ;
}
}
}
tryTerminate()不会强行终⽌线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把 状态切换到TIDYING,然后调用钩子方法terminated(),当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程使得返回(前面说明了awaitTermination)
所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated(),⽬前是一个空实现
任务的提交过程分析:
提交任务的方法如下:
public void execute ( Runnable command) {
if ( command == null )
throw new NullPointerException ( ) ;
int c = ctl. get ( ) ;
if ( workerCountOf ( c) < corePoolSize) {
if ( addWorker ( command, true ) )
return ;
c = ctl. get ( ) ;
}
if ( isRunning ( c) && workQueue. offer ( command) ) {
int recheck = ctl. get ( ) ;
if ( ! isRunning ( recheck) && remove ( command) )
reject ( command) ;
else if ( workerCountOf ( recheck) == 0 )
addWorker ( null , false ) ;
}
else if ( ! addWorker ( command, false ) )
reject ( command) ;
}
private boolean addWorker ( Runnable firstTask, boolean core) {
retry:
for ( int c = ctl. get ( ) ; ; ) {
if ( runStateAtLeast ( c, SHUTDOWN )
&& ( runStateAtLeast ( c, STOP )
|| firstTask != null
|| workQueue. isEmpty ( ) ) )
return false ;
for ( ; ; ) {
if ( workerCountOf ( c)
>= ( ( core ? corePoolSize : maximumPoolSize) & COUNT_MASK ) )
return false ;
if ( compareAndIncrementWorkerCount ( c) )
break retry;
c = ctl. get ( ) ;
if ( runStateAtLeast ( c, SHUTDOWN ) )
continue retry;
}
}
boolean workerStarted = false ;
boolean workerAdded = false ;
Worker w = null ;
try {
w = new Worker ( firstTask) ;
final Thread t = w. thread;
if ( t != null ) {
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
int c = ctl. get ( ) ;
if ( isRunning ( c) ||
( runStateLessThan ( c, STOP ) && firstTask == null ) ) {
if ( t. isAlive ( ) )
throw new IllegalThreadStateException ( ) ;
workers. add ( w) ;
int s = workers. size ( ) ;
if ( s > largestPoolSize)
largestPoolSize = s;
workerAdded = true ;
}
} finally {
mainLock. unlock ( ) ;
}
if ( workerAdded) {
t. start ( ) ;
workerStarted = true ;
}
}
} finally {
if ( ! workerStarted)
addWorkerFailed ( w) ;
}
return workerStarted;
}
任务的执行过程分析:
在上面的任务提交过程中,可能会开启一个新的Worker(然后执行,前面的start,最终关键操作如下:run方法),并把任务本身作为firstTask赋给该Worker,但对于 一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程(如执行完变成空,继续赋值)
下面来看Woker的run()方法的实现过程:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker ( Runnable firstTask) {
setState ( - 1 ) ;
this . firstTask = firstTask;
this . thread = getThreadFactory ( ) . newThread ( this ) ;
}
public void run ( ) {
runWorker ( this ) ;
}
final void runWorker ( Worker w) {
Thread wt = Thread . currentThread ( ) ;
Runnable task = w. firstTask;
w. firstTask = null ;
w. unlock ( ) ;
boolean completedAbruptly = true ;
try {
while ( task != null || ( task = getTask ( ) ) != null ) {
w. lock ( ) ;
if ( ( runStateAtLeast ( ctl. get ( ) , STOP ) ||
( Thread . interrupted ( ) &&
runStateAtLeast ( ctl. get ( ) , STOP ) ) ) &&
! wt. isInterrupted ( ) )
wt. interrupt ( ) ;
try {
beforeExecute ( wt, task) ;
try {
task. run ( ) ;
afterExecute ( task, null ) ;
} catch ( Throwable ex) {
afterExecute ( task, ex) ;
throw ex;
}
} finally {
task = null ;
w. completedTasks++ ;
w. unlock ( ) ;
}
}
completedAbruptly = false ;
} finally {
processWorkerExit ( w, completedAbruptly) ;
}
}
}
shutdown()与任务执行过程综合分析:
把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用 shutdown()的时候,可能出现以下 ⼏种场景:
1:当调用shutdown()的时候,若所有线程都处于空闲状态,这意味着任务队列一定是空的,此时,所有线程都会阻塞在 getTask()方法的地方(通常使得不会执行对应的循环,一般空闲会在这里),然后,所有线程都会 收到interruptIdleWorkers()发来的中断信号,getTask()返回null(中断就不阻塞了,因为有处理了中断操作),所有Worker都会退出while循环,之 后执行processWorkerExit(线程也退出或者删除,不是核心的)
2:当调用shutdown()的时候,若所有线程都处于忙碌状态,此时,队列可能是空的,也可能是⾮空的,interruptIdleWorkers()内部的tryLock调用失败,什么都不会 做(调用成功就会发送中断信号,而调用失败说明在执行中,不是空闲的,前面也说明了),所有线程会继续执行自己当前的任务,之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null(他也会分配队列任务),所以之后,就和场景1一样了,最终也是退出while循环
3:当调用shutdown()的时候,部分线程忙碌,部分线程空闲,有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方,空闲的这些线程会 和场景1一样处理,不空闲的线程会和场景2一样处理
下面看一下getTask()方法的内部细节:
private Runnable getTask ( ) {
boolean timedOut = false ;
for ( ; ; ) {
int c = ctl. get ( ) ;
if ( runStateAtLeast ( c, SHUTDOWN )
&& ( runStateAtLeast ( c, STOP ) || workQueue. isEmpty ( ) ) ) {
decrementWorkerCount ( ) ;
return null ;
}
int wc = workerCountOf ( c) ;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ( ( wc > maximumPoolSize || ( timed && timedOut) )
&& ( wc > 1 || workQueue. isEmpty ( ) ) ) {
if ( compareAndDecrementWorkerCount ( c) )
return null ;
continue ;
}
try {
Runnable r = timed ?
workQueue. poll ( keepAliveTime, TimeUnit . NANOSECONDS ) :
workQueue. take ( ) ;
if ( r != null )
return r;
timedOut = true ;
} catch ( InterruptedException retry) {
timedOut = false ;
}
}
}
shutdownNow() 与任务执行过程综合分析:
和上面的 shutdown()类似,只是多了一个环节,即清空任务队列,如果一个线程正在执行某个业务代码,即 使向它发送中断信号,也没有用,只能等它把代码执行完成(从上面看中断会继续执行),因此,中断空闲线程和中断所有线程的区别并不是很 大(除⾮线程当前刚好阻塞在某个地方,所以要这样说,中断所有空闲线程和中断所有线程的区别不大),所以实际上shutdownNow() 主要的操作就是清空所有队列,所以使得始终相当于场景1(因为无论是否忙碌都中断了,相当于场景1的所有空闲都中断)
当一个Worker最终退出的时候,会执行清理工作:
private void processWorkerExit ( Worker w, boolean completedAbruptly) {
if ( completedAbruptly)
decrementWorkerCount ( ) ;
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
completedTaskCount += w. completedTasks;
workers. remove ( w) ;
} finally {
mainLock. unlock ( ) ;
}
tryTerminate ( ) ;
int c = ctl. get ( ) ;
if ( runStateLessThan ( c, STOP ) ) {
if ( ! completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if ( min == 0 && ! workQueue. isEmpty ( ) )
min = 1 ;
if ( workerCountOf ( c) >= min)
return ;
}
addWorker ( null , false ) ;
}
}
线程池的4种拒绝策略:
在前面说明execute(Runnable command)的最后,调用了reject(command)执行拒绝策略,代码如下所示
else if ( ! addWorker ( command, false ) )
reject ( command) ;
final void reject ( Runnable command) {
handler. rejectedExecution ( command, this ) ;
}
handler就是我们可以设置的拒绝策略管理器:
RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy
package java. util. concurrent ;
public interface RejectedExecutionHandler {
void rejectedExecution ( Runnable r, ThreadPoolExecutor executor) ;
}
ThreadPoolExecutor类中默认的实现是:
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue) {
this ( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors . defaultThreadFactory ( ) , defaultHandler) ;
}
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
RejectedExecutionHandler handler) {
this ( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors . defaultThreadFactory ( ) , handler) ;
}
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory) {
this ( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler) ;
}
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if ( corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0 )
throw new IllegalArgumentException ( ) ;
if ( workQueue == null || threadFactory == null || handler == null )
throw new NullPointerException ( ) ;
this . corePoolSize = corePoolSize;
this . maximumPoolSize = maximumPoolSize;
this . workQueue = workQueue;
this . keepAliveTime = unit. toNanos ( keepAliveTime) ;
this . threadFactory = threadFactory;
this . handler = handler;
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy ( ) ;
四种策略的实现代码如下(对应ThreadPoolExecutor的四个内部类):
策略1:调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操 作吧:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
if ( ! e. isShutdown ( ) ) {
r. run ( ) ;
}
}
}
策略2:线程池抛异常(这个是默认的):
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException ( "Task " + r. toString ( ) +
" rejected from " +
e. toString ( ) ) ;
}
}
策略3:线程池直接丢掉任务,神不知⻤不觉(什么都不处理,自然就是丢掉了,因为也没有在队列里面(因为是操作满的)):
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
}
}
策略4:删除队列中最早的任务,将当前任务入队列:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
if ( ! e. isShutdown ( ) ) {
e. getQueue ( ) . poll ( ) ;
e. execute ( r) ;
}
}
}
示例程序:
package main6 ;
import java. util. concurrent. ArrayBlockingQueue ;
import java. util. concurrent. ThreadPoolExecutor ;
import java. util. concurrent. TimeUnit ;
public class ThreadPoolExecutorDemo {
public static void main ( String [ ] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor ( 3 , 5 , 1 ,
TimeUnit . SECONDS , new ArrayBlockingQueue < > ( 3 ) ,
new ThreadPoolExecutor. DiscardPolicy ( ) ) ;
for ( int i = 0 ; i < 20 ; i++ ) {
int finalI = i;
executor. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getId ( ) + "[" + finalI + "] -- 开始" ) ;
try {
Thread . sleep ( 5000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( Thread . currentThread ( ) . getId ( ) + "[" + finalI + "] -- 结束" ) ;
}
} ) ;
try {
Thread . sleep ( 200 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
System . out. println ( 1 ) ;
executor. shutdown ( ) ;
boolean flag = true ;
try {
do {
flag = ! executor. awaitTermination ( 1 , TimeUnit . SECONDS ) ;
System . out. println ( flag) ;
} while ( flag) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "线程池关闭成功。。。" ) ;
System . out. println ( Thread . currentThread ( ) . getId ( ) ) ;
}
}
Executors工具类:
concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池,前面有操作过,如:
ExecutorService executorService = Executors . newFixedThreadPool ( 10 ) ;
四种对比:
单线程的线程池:
public static ExecutorService newSingleThreadExecutor ( ) {
return new FinalizableDelegatedExecutorService
( new ThreadPoolExecutor ( 1 , 1 ,
0L , TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ) ) ;
}
固定数⽬线程的线程池:
public static ExecutorService newFixedThreadPool ( int nThreads) {
return new ThreadPoolExecutor ( nThreads, nThreads,
0L , TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ) ;
}
每接收一个请求,就创建一个线程来执行:
public static ExecutorService newCachedThreadPool ( ) {
return new ThreadPoolExecutor ( 0 , Integer . MAX_VALUE ,
60L , TimeUnit . SECONDS ,
new SynchronousQueue < Runnable > ( ) ) ;
}
单线程具有周期调度功能的线程池:
public static ScheduledExecutorService newSingleThreadScheduledExecutor ( ) {
return new DelegatedScheduledExecutorService
( new ScheduledThreadPoolExecutor ( 1 ) ) ;
}
多线程,有调度功能的线程池:
public static ScheduledExecutorService newScheduledThreadPool ( int corePoolSize) {
return new ScheduledThreadPoolExecutor ( corePoolSize) ;
}
有ThreadPool的是可以多,有Executor的是单
最佳实践:
不同类型的线程池,其实都是由前面的⼏个关键配置参数配置而成的,在《阿里巴巴Java开发⼿册》中,明确禁⽌使用Executors创建线程池,并要求开发者直接使用ThreadPoolExector或ScheduledThreadPoolExecutor进行创建,这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到⼼中有数,以规避因使用不当而造成资源耗尽的⻛险,之所以这样,是因为我们可以看到上面的最终代码内容实际上基本 (之所以是基本,实际上是因为有些是被当成参数而已,虽然基本都是Executor的子类)就是使用这两个类来实现的,而由于他们的参数是固定的,所以在一定的情况下并不是非常好的(因为固定),所以才会建议自己进行创建,而不是使用这些固定的参数
ScheduledThreadPoolExecutor:
ScheduledThreadPoolExecutor实现了按时间调度来执行任务:
1:延迟执行任务
public ScheduledFuture < ? > schedule ( Runnable command,
long delay,
TimeUnit unit) {
if ( command == null || unit == null )
throw new NullPointerException ( ) ;
RunnableScheduledFuture < Void > t = decorateTask ( command,
new ScheduledFutureTask < Void > ( command, null ,
triggerTime ( delay, unit) ,
sequencer. getAndIncrement ( ) ) ) ;
delayedExecute ( t) ;
return t;
}
public < V > ScheduledFuture < V > schedule ( Callable < V > callable,
long delay,
TimeUnit unit) {
if ( callable == null || unit == null )
throw new NullPointerException ( ) ;
RunnableScheduledFuture < V > t = decorateTask ( callable,
new ScheduledFutureTask < V > ( callable,
triggerTime ( delay, unit) ,
sequencer. getAndIncrement ( ) ) ) ;
delayedExecute ( t) ;
return t;
}
2:周期执行任务
public ScheduledFuture < ? > scheduleAtFixedRate ( Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if ( command == null || unit == null )
throw new NullPointerException ( ) ;
if ( period <= 0L )
throw new IllegalArgumentException ( ) ;
ScheduledFutureTask < Void > sft =
new ScheduledFutureTask < Void > ( command,
null ,
triggerTime ( initialDelay, unit) ,
unit. toNanos ( period) ,
sequencer. getAndIncrement ( ) ) ;
RunnableScheduledFuture < Void > t = decorateTask ( command, sft) ;
sft. outerTask = t;
delayedExecute ( t) ;
return t;
}
public ScheduledFuture < ? > scheduleWithFixedDelay ( Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if ( command == null || unit == null )
throw new NullPointerException ( ) ;
if ( delay <= 0L )
throw new IllegalArgumentException ( ) ;
ScheduledFutureTask < Void > sft =
new ScheduledFutureTask < Void > ( command,
null ,
triggerTime ( initialDelay, unit) ,
- unit. toNanos ( delay) ,
sequencer. getAndIncrement ( ) ) ;
RunnableScheduledFuture < Void > t = decorateTask ( command, sft) ;
sft. outerTask = t;
delayedExecute ( t) ;
return t;
}
区别如下:
AtFixedRate:按固定频率执行,与任务本身执行时间无关,但有个前提条件,任务执行时间必须小于间隔时 间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s(否则可能抢占结束执行或者重新操作间隔),那么这个时候虽然你在执行,但是不会等你执行完了,我累积的任务会去执行了
WithFixedDelay:按固定间隔执行,与任务本身执行时间有关,例如,任务本身执行时间是10s,间隔2s(delay参数),则 下一次开始执行的时间就是12s
对于延迟来说,就是等会执行,意思容易理解,就不多说了(对应的参数只是操作是否有返回的线程执行),他们都是操作线程,而后面的间隔只操作(不是是否)没有返回(返回值)的线程执行(Runnable)
延迟执行和周期性执行的原理:
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的,那它是如何实现延迟执行任务和周期性执行任务的呢?
延迟执行任务依靠的是DelayQueue(延时队列,他里面的是DelayedWorkQueue,DelayQueue只是代表名称而已哦),DelayQueue是 BlockingQueue的一种,其实现原理是二叉堆,而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行,不过这里并没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue(延迟队列,前面也说明了,对应利用优先队列,由于会扩容,那么是无界的,没有容量的队列也能称为无界,比如链表的实现的队列SynchronousQueue),特定的即是DelayedWorkQueue
static class DelayedWorkQueue extends AbstractQueue < Runnable >
implements BlockingQueue < Runnable > {
其原理和DelayQueue一样,但这里操作了任务,所以在队列基础上一般会进行优化,比如针对任务的取消进行了优化,下面主要讲延迟执行和周期性执行的实现过程
延迟执行:
public ScheduledFuture < ? > schedule ( Runnable command,
long delay,
TimeUnit unit) {
if ( command == null || unit == null )
throw new NullPointerException ( ) ;
RunnableScheduledFuture < Void > t = decorateTask ( command,
new ScheduledFutureTask < Void > ( command, null ,
triggerTime ( delay, unit) ,
sequencer. getAndIncrement ( ) ) ) ;
delayedExecute ( t) ;
return t;
}
传进去的是一个Runnable,外加延迟时间delay,在内部通过decorateTask(…)方法把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口:
private RunnableScheduledFuture < ? > [ ] queue =
new RunnableScheduledFuture < ? > [ INITIAL_CAPACITY ] ;
我们继续看看上面的delayedExecute(t);方法内容:
protected < V > RunnableScheduledFuture < V > decorateTask (
Runnable runnable, RunnableScheduledFuture < V > task) {
return task;
}
private void delayedExecute ( RunnableScheduledFuture < ? > task) {
if ( isShutdown ( ) )
reject ( task) ;
else {
super . getQueue ( ) . add ( task) ;
if ( ! canRunInCurrentRunState ( task) && remove ( task) )
task. cancel ( false ) ;
else
ensurePrestart ( ) ;
}
}
void ensurePrestart ( ) {
int wc = workerCountOf ( ctl. get ( ) ) ;
if ( wc < corePoolSize)
addWorker ( null , true ) ;
else if ( wc == 0 )
addWorker ( null , false ) ;
}
从上面的代码中可以看出,schedule()方法本身很简单,就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中,任务的执行过程还是复用的ThreadPoolExecutor, 延迟的控制是在DelayedWorkerQueue内部完成的(因为我们设置的线程池的队列是延迟队列)
周期性执行:
public ScheduledFuture < ? > scheduleAtFixedRate ( Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if ( command == null || unit == null )
throw new NullPointerException ( ) ;
if ( period <= 0L )
throw new IllegalArgumentException ( ) ;
ScheduledFutureTask < Void > sft =
new ScheduledFutureTask < Void > ( command,
null ,
triggerTime ( initialDelay, unit) ,
unit. toNanos ( period) ,
sequencer. getAndIncrement ( ) ) ;
RunnableScheduledFuture < Void > t = decorateTask ( command, sft) ;
sft. outerTask = t;
delayedExecute ( t) ;
return t;
}
public ScheduledFuture < ? > scheduleWithFixedDelay ( Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if ( command == null || unit == null )
throw new NullPointerException ( ) ;
if ( delay <= 0L )
throw new IllegalArgumentException ( ) ;
ScheduledFutureTask < Void > sft =
new ScheduledFutureTask < Void > ( command,
null ,
triggerTime ( initialDelay, unit) ,
- unit. toNanos ( delay) ,
sequencer. getAndIncrement ( ) ) ;
RunnableScheduledFuture < Void > t = decorateTask ( command, sft) ;
sft. outerTask = t;
delayedExecute ( t) ;
return t;
}
和schedule(…)方法的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多 了一个周期参数,然后放入DelayedWorkerQueue就结束了
两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?
主要用于⽣成任务序列号的sequencer,创建ScheduledFutureTask的时候使用:
private static final AtomicLong sequencer = new AtomicLong ( ) ;
private class ScheduledFutureTask < V >
extends FutureTask < V > implements RunnableScheduledFuture < V > {
private final long sequenceNumber;
private volatile long time;
private final long period;
ScheduledFutureTask ( Runnable r, V result, long triggerTime,
long sequenceNumber) {
super ( r, result) ;
this . time = triggerTime;
this . period = 0 ;
this . sequenceNumber = sequenceNumber;
}
ScheduledFutureTask ( Runnable r, V result, long triggerTime,
long period, long sequenceNumber) {
super ( r, result) ;
this . time = triggerTime;
this . period = period;
this . sequenceNumber = sequenceNumber;
}
public long getDelay ( TimeUnit unit) {
return unit. convert ( time - System . nanoTime ( ) , NANOSECONDS ) ;
}
public int compareTo ( Delayed other) {
if ( other == this )
return 0 ;
if ( other instanceof ScheduledFutureTask ) {
ScheduledFutureTask < ? > x = ( ScheduledFutureTask < ? > ) other;
long diff = time - x. time;
if ( diff < 0 )
return - 1 ;
else if ( diff > 0 )
return 1 ;
else if ( sequenceNumber < x. sequenceNumber)
return - 1 ;
else
return 1 ;
}
long diff = getDelay ( NANOSECONDS ) - other. getDelay ( NANOSECONDS ) ;
return ( diff < 0 ) ? - 1 : ( diff > 0 ) ? 1 : 0 ;
}
public void run ( ) {
if ( ! canRunInCurrentRunState ( this ) )
cancel ( false ) ;
else if ( ! isPeriodic ( ) )
super . run ( ) ;
else if ( super . runAndReset ( ) ) {
setNextRunTime ( ) ;
reExecutePeriodic ( outerTask) ;
}
}
private void setNextRunTime ( ) {
long p = period;
if ( p > 0 )
time += p;
else
time = triggerTime ( - p) ;
}
}
long triggerTime ( long delay) {
return System . nanoTime ( ) +
( ( delay < ( Long . MAX_VALUE >> 1 ) ) ? delay : overflowFree ( delay) ) ;
}
void reExecutePeriodic ( RunnableScheduledFuture < ? > task) {
if ( canRunInCurrentRunState ( task) ) {
super . getQueue ( ) . add ( task) ;
if ( canRunInCurrentRunState ( task) || ! remove ( task) ) {
ensurePrestart ( ) ;
return ;
}
}
task. cancel ( false ) ;
}
withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面,如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间(因为是time的,虽然第一次是延时,time一开始是延迟时间,既然要周期,自然要考虑加上什么时间)+period,如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),相当于是now+(-period),now即是现在执行完的结束时间,相当于就是加上-period(period < 0)了,并且也加上了now,所以说AtFixedRate是基本固定的频率,因为他基本不变(不考虑初始的延时,那么就是period),所以可能你在执行,其他的任务可能不会等你执行完,即也会执行(因为是延迟队列,他的出队并不是有线程就出对,也是需要延迟的,而另外一个基本会等待执行,因为等待时间加上了他的执行时间(虽然线程中是没有等待的,但是由于时间的存在,看起来是等待的,虽然我们也可以根据时间来判断谁先会执行,或者说决定队列的位置(所以前面有个sequencer.getAndIncrement()来保证若时间相同时的唯一,一般代表先提交的会先被执行,即序列号小,那么先执行),一般延迟队列是根据延迟时间排列的),而这个可以执行时间大于间隔,所以其他任务也会执行
这里再次给出之前的说明:
AtFixedRate:按固定频率执行,与任务本身执行时间无关,但有个前提条件,任务执行时间必须小于间隔时 间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s(否则可能抢占结束执行或者重新操作间隔)
WithFixedDelay:按固定间隔执行,与任务本身执行时间有关,例如,任务本身执行时间是10s(所以now即是现在执行完的结束时间),间隔2s,则 下一次开始执行的时间就是12s,即这里是相加,基本固定12s,即固定的间隔,只是与执行时间有关
简单来说,就是AtFixedRate按照period来周期,而WithFixedDelay按照执行时间(上一次的,因为我执行完了,需要开始新的一次了)加period来周期
上面是不同的策略,可以选择使用,因为是不同的方法
CompletableFuture用法:
从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具CompletableFuture,在JDK8之前,异步编 程(即并行的,否则后面怎么会阻塞(下面案例的get方法)并解除呢)可以通过线程池和Future来实现(前面给出过案例,即ThreadPoolTest类的案例,使用了线程池的,只是是固定参数,是ThreadPoolExecutor线程池的固定参数的创建方式),但功能还不够强大
示例代码:
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
public class CompletableFutureDemo {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < String > future = new CompletableFuture < > ( ) ;
new Thread ( ( ) -> {
try {
Thread . sleep ( 1000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
future. complete ( "hello world" ) ;
} ) . start ( ) ;
System . out. println ( "获取结果中。。。" ) ;
String result = future. get ( ) ;
System . out. println ( "获取的结果:" + result) ;
}
}
CompletableFuture实现了Future接口,所以它也具有Future的特性:调用get()方法会阻塞在那,直到结果返 回,这里唯一的特点是,他可以定义返回信息,而不是经过对应的线程得到结果,或者说他只是返回参数,底层由线程使得执行方法返回而已
而由于只是返回结果,并没有清空结果(使得为null,或者继续阻塞),所以可以说1个线程调用complete方法完成该Future,则所有阻塞在get()方法的线程都将获得返回结果,同理对应的线程池加上Future也是一样的,你可以在案例中操作多个new Thread就知道了(里面要有future.get())
runAsync与supplyAsync:
上面的例子是一个空的任务(只是一个返回值),下面尝试提交一个真的任务,然后等待结果返回
runAsync(Runnable):
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
public class CompletableFutureDemo2 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < Void > voidCompletableFuture =
CompletableFuture . runAsync ( ( ) -> {
try {
Thread . sleep ( 2000 ) ;
System . out. println ( "任务执行完成" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
Object unused = voidCompletableFuture. get ( ) ;
System . out. println ( unused) ;
System . out. println ( "程序运行结束" ) ;
}
}
CompletableFuture.runAsync(…)传入的是一个Runnable接口
supplyAsync(Supplier):
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
import java. util. concurrent. TimeUnit ;
import java. util. function. Supplier ;
public class CompletableFutureDemo3 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < String > future = CompletableFuture . supplyAsync ( new Supplier < String > ( ) {
@Override
public String get ( ) {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
return "这是结果" ;
}
} ) ;
String result = future. get ( ) ;
System . out. println ( "任务执行结果:" + result) ;
}
}
runAsync和supplyAsync的区别在于,supplyAsync的任务有返回值,没有返回值的任务,提交的是Runnable,返回的是 CompletableFuture,有返回值的任务,提交的是 Supplier,返回的是CompletableFuture(即对应创建对象返回的结果都是赋值给CompletableFuture类型的引用)
Supplier和前面的Callable很相似,都是操作计算结果,前面的CompletableFutureDemo案例是直接给结果了
通过上面两个例子可以看出,在基本的用法上,CompletableFuture和Future很相似,都可以提交两类任务: 一类是无返回值的,另一类是有返回值的,其中Future无返回值在前面的案例中有体现,即ThreadPoolTest案例中不操作Callable的任务就是返回null,也就是无返回值
thenRun、thenAccept和thenApply:
对于 Future,在提交任务之后,只能调用 get()等结果返回,但对于 CompletableFuture,可以在结果上面再 加一个callback,当得到结果之后,再接着执行callback
thenRun(Runnable):
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
public class CompletableFutureDemo4 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture voidCompletableFuture = CompletableFuture . supplyAsync ( ( ) -> {
try {
Thread . sleep ( 5 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( 1 ) ;
return "这是结果" ;
} ) . thenRun ( ( ) -> {
try {
Thread . sleep ( 2 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "任务执行结束之后执行的语句" ) ;
} ) ;
Object o = voidCompletableFuture. get ( ) ;
System . out. println ( o) ;
System . out. println ( "任务执行结束" ) ;
}
}
该案例最后不能获取到结果,只会得到一个null
thenAccept(Consumer):
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
public class CompletableFutureDemo5 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < Void > future = CompletableFuture . supplyAsync ( ( ) -> {
try {
Thread . sleep ( 5000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "返回中间结果" ) ;
return "这是中间结果" ;
} ) . thenAccept ( ( param) -> {
try {
Thread . sleep ( 2000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "任务执行后获得前面的中间结果:" + param) ;
} ) ;
Object unused = future. get ( ) ;
System . out. println ( unused) ;
System . out. println ( "任务执行完成" ) ;
}
}
即上述代码在thenAccept中可以获取任务的执行结果,接着进行再次的处理
thenApply(Function):
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
import java. util. function. Function ;
public class CompletableFutureDemo6 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < Integer > future = CompletableFuture . supplyAsync ( ( ) -> {
try {
Thread . sleep ( 5000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "返回中间结果" ) ;
return "abcdefg" ;
} ) . thenApply ( new Function < String , Integer > ( ) {
@Override
public Integer apply ( String middle) {
try {
Thread . sleep ( 2000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( middle) ;
System . out. println ( "获取中间结果,再次计算返回" ) ;
return middle. length ( ) ;
}
} ) ;
Integer integer = future. get ( ) ;
System . out. println ( "最终的结果为:" + integer) ;
}
}
三个例子都是在任务执行完成之后,接着执行回调,只是回调的形式不同:
1:thenRun后面跟的是一个无参数、无返回值的方法,即Runnable,所以最终的返回值是CompletableFuture类型
2:thenAccept后面跟的是一个有参数、无返回值的方法,称为Consumer,返回值也是CompletableFuture类型,顾名思义,只进不出,所以称为Consumer,前面的Supplier(中间结果),是无参 数,有返回值,只出不进,和Consumer刚好相反
3:thenApply 后面跟的是一个有参数、有返回值的方法,称为Function,返回值是CompletableFuture类型,注意:参数接收的是前一个任务,即 supplyAsync(…)这个任务的返回值,因此对应的只能用supplyAsync,不能用runAsync,因为runAsync没有返回值,不能为下一个链式方法传入参数,如果用了runAsync,那么会报错,这个报错只是针对thenApply,其他默认的参数值为null,比如thenAccept就是,而之所以只是针对thenApply,是因为我们认为他是利用参数的,若参数是null,自然不能利用(因为容易出现空指针异常),所以规定对他来说是报错,而不是默认null(虽然实际上是可以不利用)
thenCompose与thenCombine:
thenCompose:
在上面的例子中,thenApply接收的是一个Function,但是这个Function的返回值是一个通常的基本数据类型 或一个对象,而不是另外一个 CompletableFuture,如果 Function 的返回值也是一个CompletableFuture,就会 出现嵌套的CompletableFuture,比如考虑下面的例子:
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
import java. util. function. Function ;
import java. util. function. Supplier ;
public class CompletableFutureDemo7 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < CompletableFuture < Integer > > future =
CompletableFuture . supplyAsync ( new Supplier < String > ( ) {
@Override
public String get ( ) {
return "hello world" ;
}
} ) . thenApply ( new Function < String , CompletableFuture < Integer > > ( ) {
@Override
public CompletableFuture < Integer > apply ( String s) {
return CompletableFuture . supplyAsync ( new Supplier < Integer > ( ) {
@Override
public Integer get ( ) {
return s. length ( ) ;
}
} ) ;
}
} ) ;
CompletableFuture < Integer > future1 = future. get ( ) ;
Integer result = future1. get ( ) ;
System . out. println ( result) ;
}
}
如果希望返回值是一个⾮嵌套的CompletableFuture,可以使用thenCompose:
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. CompletionStage ;
import java. util. concurrent. ExecutionException ;
import java. util. function. Function ;
import java. util. function. Supplier ;
public class CompletableFutureDemo8 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < Integer > future = CompletableFuture . supplyAsync ( new Supplier < String > ( ) {
@Override
public String get ( ) {
return "hello world" ;
}
} ) . thenCompose ( new Function < String , CompletionStage < Integer > > ( ) {
@Override
public CompletionStage < Integer > apply ( String s) {
return CompletableFuture . supplyAsync ( new Supplier < Integer > ( ) {
@Override
public Integer get ( ) {
return s. length ( ) ;
}
} ) ;
}
} ) ;
Integer integer = future. get ( ) ;
System . out. println ( integer) ;
}
}
下面是thenCompose方法的接口定义:
public < U > CompletionStage < U > thenCompose
( Function < ? super T , ? extends CompletionStage < U > > fn) ;
CompletableFuture中的实现:
public < U > CompletableFuture < U > thenCompose (
Function < ? super T , ? extends CompletionStage < U > > fn) {
return uniComposeStage ( null , fn) ;
}
从该方法的定义可以看出,它传入的参数是一个Function类型(只有一个参数 ),并且Function的返回值必须是CompletionStage的子类,也就是CompletableFuture类型
thenCombine:
thenCombine方法的接口定义如下,从传入的参数可以看出,它不同于thenCompose
public < U , V > CompletableFuture < V > thenCombine (
CompletionStage < ? extends U > other,
BiFunction < ? super T , ? super U , ? extends V > fn) {
return biApplyStage ( null , other, fn) ;
}
第1个参数是一个CompletableFuture类型(可以是),第2个参数是一个方法,并且是一个BiFunction,也就是该方法有2个 输入参数,1个返回值(两个参数)
从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个CompletableFuture的返 回值传进去,再额外做一些事情,实例如下:
package main7 ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
import java. util. function. BiFunction ;
import java. util. function. Supplier ;
public class CompletableFutureDemo9 {
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture < Integer > future = CompletableFuture . supplyAsync ( new Supplier < String > ( ) {
@Override
public String get ( ) {
return "hello" ;
}
} ) . thenCombine ( CompletableFuture . supplyAsync ( new Supplier < String > ( ) {
@Override
public String get ( ) {
return "lagou" ;
}
} ) , new BiFunction < String , String , Integer > ( ) {
@Override
public Integer apply ( String s, String s2) {
return s. length ( ) + s2. length ( ) ;
}
} ) ;
Integer result = future. get ( ) ;
System . out. println ( result) ;
}
}
任意个CompletableFuture的组合:
上面的thenCompose和thenCombine基本只能组合2个CompletableFuture(当然由于是返回值,你可以选择嵌套,其中由于thenCombine只看第一层,那么他基本不操作嵌套,因为不改变返回值,所以他基本不会嵌套,虽然也行,但是基本没有作用),所以这里是组合,而不是嵌套(组合代表有效的使用,而不是直接的嵌套使用),而接下来的allOf 和anyOf 可以组合 任意多个CompletableFuture,方法接口定义如下所示
public static CompletableFuture < Void > allOf ( CompletableFuture < ? > . . . cfs) {
return andTree ( cfs, 0 , cfs. length - 1 ) ;
}
public static CompletableFuture < Object > anyOf ( CompletableFuture < ? > . . . cfs) {
int n; Object r;
if ( ( n = cfs. length) <= 1 )
return ( n == 0 )
? new CompletableFuture < Object > ( )
: uniCopyStage ( cfs[ 0 ] ) ;
for ( CompletableFuture < ? > cf : cfs)
if ( ( r = cf. result) != null )
return new CompletableFuture < Object > ( encodeRelay ( r) ) ;
cfs = cfs. clone ( ) ;
CompletableFuture < Object > d = new CompletableFuture < > ( ) ;
for ( CompletableFuture < ? > cf : cfs)
cf. unipush ( new AnyOf ( d, cf, cfs) ) ;
if ( d. result != null )
for ( int i = 0 , len = cfs. length; i < len; i++ )
if ( cfs[ i] . result != null )
for ( i++ ; i < len; i++ )
if ( cfs[ i] . result == null )
cfs[ i] . cleanStack ( ) ;
return d;
}
⾸先,这两个方法都是静态方法,参数是变长的CompletableFuture的集合,其次,allOf和anyOf的区别,前 者是"与",后者是"或"
allOf的返回值是CompletableFuture类型,这是因为每个传入的CompletableFuture的返回值都可能 不同,所以组合的结果是无法用某种类型来表示的,索性返回Void类型
anyOf 的含义是只要有任意一个 CompletableFuture 结束,就可以做接下来的事情,而无须像AllOf那样,等 待所有的CompletableFuture结束,但由于每个CompletableFuture的返回值类型都可能不同,任意一个,意味着无法判断是什么类型,所以anyOf的返回值是CompletableFuture< Object>类型
接下来给出案例(这里提一下,我们知道他们是异步的,如果没有阻塞的存在,那么当主程序结束时,他们会中断的,也就是说,如果你循环多个对应的操作,但是由于线程池的原因,那么有些可能没有执行,就结束了):
package main7 ;
import java. util. Random ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
import java. util. function. Function ;
public class CompletableFutureDemo11 {
private static final Random RANDOM = new Random ( ) ;
private static volatile int result = 0 ;
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture [ ] futures = new CompletableFuture [ 10 ] ;
for ( int i = 0 ; i < 10 ; i++ ) {
int finalI = i;
CompletableFuture < Void > future = CompletableFuture . runAsync ( new Runnable ( ) {
@Override
public void run ( ) {
try {
Thread . sleep ( 1000 + RANDOM . nextInt ( 1000 ) ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
result++ ;
}
} ) ;
futures[ i] = future;
}
System . out. println ( result) ;
Thread . sleep ( 5000 ) ;
Integer anyResult = CompletableFuture . anyOf ( futures) . thenApply ( new Function < Object , Integer > ( ) {
@Override
public Integer apply ( Object o) {
return result;
}
} ) . get ( ) ;
System . out. println ( anyResult) ;
}
}
package main7 ;
import java. util. Random ;
import java. util. concurrent. CompletableFuture ;
import java. util. concurrent. ExecutionException ;
import java. util. function. Function ;
public class CompletableFutureDemo12 {
private static final Random RANDOM = new Random ( ) ;
private static volatile int result = 0 ;
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
CompletableFuture [ ] futures = new CompletableFuture [ 10 ] ;
for ( int i = 0 ; i < 10 ; i++ ) {
int finalI = i;
CompletableFuture < Void > future = CompletableFuture . runAsync ( new Runnable ( ) {
@Override
public void run ( ) {
try {
Thread . sleep ( 1000 + RANDOM . nextInt ( 1000 ) ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
result++ ;
}
} ) ;
futures[ i] = future;
}
System . out. println ( result) ;
for ( int i = 0 ; i < 10 ; i++ ) {
futures[ i] . get ( ) ;
System . out. println ( result) ;
}
Integer allResult = CompletableFuture . allOf ( futures) . thenApply ( new Function < Void , Integer > ( ) {
@Override
public Integer apply ( Void unused) {
return result;
}
} ) . get ( ) ;
System . out. println ( allResult) ;
}
}
四种任务原型:
通过上面的例子可以总结出,提交给CompletableFuture执行的任务有四种类型:Runnable(thenRun)、Consumer(thenAccept)、Supplier(supplyAsync)、Function(thenApply )下面是这四种任务原型的对比
对应的提交方法代表是他对应的参数是该接口
runAsync 与 supplierAsync(supplyAsync的别名) 是 CompletableFuture 的静态方法,而 thenAccept、thenAsync、thenApplyCompletableFutre的成员方法
因为初始的时候没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable或者Supplier, 只能是静态方法
通过静态方法⽣成CompletableFuture对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable、Consumer、Function,且都是成员方法
CompletionStage接口:
CompletableFuture不仅实现了Future接口,还实现了CompletableStage接口
public class CompletableFuture < T > implements Future < T > , CompletionStage < T > {
CompletionStage接口定义的正是前面的各种链式方法、组合方法,如下所示
package java. util. concurrent ;
public interface CompletionStage < T > {
public CompletionStage < Void > thenRun ( Runnable action) ;
public CompletionStage < Void > thenAccept ( Consumer < ? super T > action) ;
public < U > CompletionStage < U > thenApply ( Function < ? super T , ? extends U > fn) ;
public < U > CompletionStage < U > thenCompose
( Function < ? super T , ? extends CompletionStage < U > > fn) ;
public < U , V > CompletionStage < V > thenCombine
( CompletionStage < ? extends U > other, BiFunction < ? super T , ? super U , ? extends V > fn) ;
}
关于CompletionStage接口,有⼏个关键点要说明:
1:所有方法的返回值都是CompletionStage类型,也就是它自己,正因为如此,才能实现如下的链式调用:future1.thenApply(…).thenApply(…).thenCompose(…).thenRun(…)
2:thenApply接收的是一个有输入参数、返回值的Function,这个Function的输入参数,必须是?Super T 类型,也就是可以赋值T或者T的父类型(在22章博客有说明),而返回值则必 须是?Extends U类型,也就是U或者U的子类型(注意:虽然在某些方面可能只是确定保留类型,他只是赋值而已,并没有参与真正的操作,就如没有指向add一样,22章博客),而U恰好是thenApply的返回值的CompletionStage对 应的类型(还是注意:U只是说明,实际上看创建的对象的操作,虽然引用会在编译期影响),所以在22章博客中一般extends不能添加,因为是编译期的,实际上如果跳过这个,只会看对象,这里就是看对象
其他方法,诸如thenCompose、thenCombine也是类似的原理
CompletableFuture内部原理:
CompletableFuture的构造:ForkJoinPool
CompletableFuture中任务的执行依靠ForkJoinPool:
public class CompletableFuture < T > implements Future < T > , CompletionStage < T > {
private static final Executor asyncPool = useCommonPool ? ForkJoinPool . commonPool ( ) : new ThreadPerTaskExecutor ( ) ;
public static < U > CompletableFuture < U > supplyAsync ( Supplier < U > supplier) {
return asyncSupplyStage ( asyncPool, supplier) ;
}
static < U > CompletableFuture < U > asyncSupplyStage ( Executor e, Supplier < U > f) {
if ( f == null ) throw new NullPointerException ( ) ;
CompletableFuture < U > d = new CompletableFuture < U > ( ) ;
e. execute ( new AsyncSupply < U > ( d, f) ) ;
return d;
}
}
通过上面的代码可以看到,asyncPool是一个static类型,supplierAsync、asyncSupplyStage也都是static方 法,Static方法会返回一个CompletableFuture类型对象,之后就可以链式调用CompletionStage里面的各个方 法(子类有对应的方法的,并且是实现方式,必然有)
任务类型的适配:
ForkJoinPool接受的任务是ForkJoinTask 类型,而我们向CompletableFuture提交的任务 是 Runnable/Supplier/Consumer/Function ,因此,肯定需要一个适配机制,把这四种类型的任务转换成ForkJoinTask,然后提交给ForkJoinPool,如下图所示:
为了完成这种转换,在CompletableFuture内部定义了一系列的内部类,下图是CompletableFuture的各种内 部类的继承体系
都是白色实线,即都是继承的意思,即虽然是这样的转换,但是最终的执行还是对应参数的对应方法的
在 supplyAsync(…)方法内部,会把一个 Supplier 转换成一个 AsyncSupply(上面的e.execute(new AsyncSupply< U>(d, f));),其中f就是Supplier,然后提交给ForkJoinPool执行,在runAsync(…)方法内部,会把一个Runnable转换成一个AsyncRun,然后提交给ForkJoinPool执行,在 thenRun/thenAccept/thenApply 内部,会分别把 Runnable/Consumer/Function 转换成UniRun/UniAccept/UniApply对象,然后提交给ForkJoinPool执行,除此之外,还有两种 CompletableFuture 组合的情况,分为"与"和"或"(前面有过说明,但他们是更多的),所以有对应的Bi和Or开头的名称的类型的Completion(最终会继承的)类型
下面的代码分别为 UniRun、UniApply、UniAccept 的定义,可以看到,其内部分别封装了Runnable、Function、Consumer
static final class UniRun < T > extends UniCompletion < T , Void > {
Runnable fn;
UniRun ( Executor executor, CompletableFuture < Void > dep,
CompletableFuture < T > src, Runnable fn) {
super ( executor, dep, src) ; this . fn = fn;
static final class UniApply < T , V > extends UniCompletion < T , V > {
Function < ? super T , ? extends V > fn;
UniApply ( Executor executor, CompletableFuture < V > dep,
CompletableFuture < T > src,
Function < ? super T , ? extends V > fn) {
super ( executor, dep, src) ; this . fn = fn;
}
static final class UniAccept < T > extends UniCompletion < T , Void > {
Consumer < ? super T > fn;
UniAccept ( Executor executor, CompletableFuture < Void > dep,
CompletableFuture < T > src, Consumer < ? super T > fn) {
super ( executor, dep, src) ; this . fn = fn;
}
很明显构造方法中都进行赋值了,所以的确是封装的,所以他们基本都是利用内部类的,即如下图所示:
所以还是执行对应他们封装的方法的,只不过包装了而已,这个包装可以进行其他作用(虽然这里没有说明,一般可能与入栈有关,后面会说明,因为对应的参数是需要对应的内部类的类型的(内部类的父类类型,所以内部类可以被该参数指向,因为对于内部类来说,该参数是内部类的父类))
任务的链式执行过程分析:
下面以CompletableFuture.supplyAsync(…).thenApply(…).thenRun(…)链式代码为例,分析整个执行过程
第1步:CompletableFuture future1=CompletableFuture.supplyAsync(…)
public static < U > CompletableFuture < U > supplyAsync ( Supplier < U > supplier) {
return asyncSupplyStage ( ASYNC_POOL , supplier) ;
}
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool . commonPool ( ) : new ThreadPerTaskExecutor ( ) ;
static < U > CompletableFuture < U > asyncSupplyStage ( Executor e,
Supplier < U > f) {
if ( f == null ) throw new NullPointerException ( ) ;
CompletableFuture < U > d = new CompletableFuture < U > ( ) ;
e. execute ( new AsyncSupply < U > ( d, f) ) ;
return d;
}
在上面的代码中,关键是构造了一个AsyncSupply对象,该对象有三个关键点:
1:它继承自ForkJoinTask,所以能够提交ForkJoinPool来执行(e.execute提交任务的代码,虽然这里没有说明底层)
2:它封装了Supplier f,即它所执行任务的具体内容,所以对应的构造方法中基本就是操作任务的(虽然最终会到前面提交到的d)
3:该任务的返回值,即CompletableFuture d,也被封装在里面
ForkJoinPool执行一个ForkJoinTask类型的任务,即AsyncSupply,该任务的输入就是Supply(Supplier f,我们给的参数),输出结果存放 在CompletableFuture中
第2步:CompletableFuture future2=future1.thenApply(…)
第1步的返回值,也就是上面代码中的 CompletableFuture d,紧接着调用其成员方法thenApply:
public < U > CompletableFuture < U > thenApply (
Function < ? super T , ? extends U > fn) {
return uniApplyStage ( null , fn) ;
}
private < V > CompletableFuture < V > uniApplyStage (
Executor e, Function < ? super T , ? extends V > f) {
if ( f == null ) throw new NullPointerException ( ) ;
Object r;
if ( ( r = result) != null )
return uniApplyNow ( r, e, f) ;
CompletableFuture < V > d = newIncompleteFuture ( ) ;
unipush ( new UniApply < T , V > ( e, d, this , f) ) ;
return d;
}
我们知道,必须等第1步的任务执行完毕,第2步的任务才可以执行,因此,这里提交的任务不可能立即执行, 在此处构建了一个UniApply对象(是ForkJoinTask的子类,final void unipush(Completion c) {中Completion 是直接abstract static class Completion extends ForkJoinTask< Void>,即直接继承),也就是一个ForkJoinTask类型的任务,这个任务放入了第1个任务的栈当中,当他执行完自然在执行(从第一个开始)
final void unipush ( Completion c) {
if ( c != null ) {
while ( ! tryPushStack ( c) ) {
if ( result != null ) {
NEXT . set ( c, null ) ;
break ;
}
}
if ( result != null )
c. tryFire ( SYNC ) ;
}
}
每一个CompletableFuture对象内部都有一个栈,存储着是后续依赖它的任务,如下面代码所示,这个栈也就 是Treiber Stack,这里的stack存储的就是栈顶指针
volatile Object result;
volatile Completion stack;
上面的UniApply对象类似于第1步里面的AsyncSupply,它的构造方法传入了4个参数(unipush(new UniApply<T,V>(e, d, this, f));,后面的四个参数):
1:第1个参数是执行它的ForkJoinPool(ForkJoinPool是Executor的子类)
2:第2个参数是输出一个CompletableFuture对象,这个参数,也是thenApply方法的返回值,用来链式执 行下一个任务
3:第3个参数是其依赖的前置任务,也就是第1步里面提交的任务(所以是this,这样可以利用返回的对象来利用,也就是说,返回的CompletableFuture对象包含了返回值结果)
4:第4个参数是输入(也就是一个Function对象)
static final class UniApply < T , V > extends UniCompletion < T , V > {
Function < ? super T , ? extends V > fn;
UniApply ( Executor executor, CompletableFuture < V > dep,
CompletableFuture < T > src,
Function < ? super T , ? extends V > fn) {
super ( executor, dep, src) ; this . fn = fn;
}
UniApply对象被放入了第1步的CompletableFuture的栈中(第一个自然是AsyncSupply的任务提交),在第1步的任务执行完成之后,就会从栈中弹出 并执行,如下代码:
static final class AsyncSupply < T > extends ForkJoinTask < Void >
implements Runnable , AsynchronousCompletionTask {
CompletableFuture < T > dep; Supplier < ? extends T > fn;
AsyncSupply ( CompletableFuture < T > dep, Supplier < ? extends T > fn) {
this . dep = dep; this . fn = fn;
}
public final Void getRawResult ( ) { return null ; }
public final void setRawResult ( Void v) { }
public final boolean exec ( ) { run ( ) ; return false ; }
public void run ( ) {
CompletableFuture < T > d; Supplier < ? extends T > f;
if ( ( d = dep) != null && ( f = fn) != null ) {
dep = null ; fn = null ;
if ( d. result == null ) {
try {
d. completeValue ( f. get ( ) ) ;
} catch ( Throwable ex) {
d. completeThrowable ( ex) ;
}
}
d. postComplete ( ) ;
}
}
}
ForkJoinPool执行上面的AsyncSupply对象的run()方法,实质就是执行Supplier的get()方法(呼应之前入栈的set方法),执行结果被塞入 了 CompletableFuture d 当中,也就是赋值给了 CompletableFuture 内部的Object result变量,也就是解决了前面的这个:
if ( ( r = result) != null )
return uniApplyNow ( r, e, f) ;
调用d.postComplete(),也正是在这个方法里面,把第2步压入的UniApply对象弹出来执行,代码如下所示
final void postComplete ( ) {
CompletableFuture < ? > f = this ; Completion h;
while ( ( h = f. stack) != null ||
( f != this && ( h = ( f = this ) . stack) != null ) ) {
CompletableFuture < ? > d; Completion t;
if ( STACK . compareAndSet ( f, h, t = h. next) ) {
if ( t != null ) {
if ( f != this ) {
pushStack ( h) ;
continue ;
}
NEXT . compareAndSet ( h, t, null ) ;
}
f = ( d = h. tryFire ( NESTED ) ) == null ? this : d;
}
}
}
第3步:CompletableFuture future3=future2.thenRun()
第3步和第2步的过程类似,构建了一个 UniRun 对象,这个对象被压入第2步的CompletableFuture所在的栈 中的对应第2步的任务,当执行完成时,从自己的栈中弹出UniRun对象并执行
综上所述:
通过supplyAsync/thenApply/thenRun,分别提交了3个任务,每1个任务都有1个返回值对象,也就是1个CompletableFuture,这3个任务通过2个CompletableFuture完成串联,后1个任务,被放入了前1个任务的CompletableFuture里面,前1个任务在执行完成时,会从自己的栈中,弹出下1个任务执行,如此向后传递,完成 任务的链式执行
thenApply与thenApplyAsync的区别:
public < U > CompletableFuture < U > thenApply (
Function < ? super T , ? extends U > fn) {
return uniApplyStage ( null , fn) ;
}
public < U > CompletableFuture < U > thenApplyAsync (
Function < ? super T , ? extends U > fn) {
return uniApplyStage ( defaultExecutor ( ) , fn) ;
}
private < V > CompletableFuture < V > uniApplyStage (
Executor e, Function < ? super T , ? extends V > f) {
if ( f == null ) throw new NullPointerException ( ) ;
Object r;
if ( ( r = result) != null )
return uniApplyNow ( r, e, f) ;
CompletableFuture < V > d = newIncompleteFuture ( ) ;
unipush ( new UniApply < T , V > ( e, d, this , f) ) ;
return d;
}
对于上一个任务已经得出结果的情况,那么直接的执行return uniApplyNow(r, e, f);:
private < V > CompletableFuture < V > uniApplyNow (
Object r, Executor e, Function < ? super T , ? extends V > f) {
Throwable x;
CompletableFuture < V > d = newIncompleteFuture ( ) ;
if ( r instanceof AltResult ) {
if ( ( x = ( ( AltResult ) r) . ex) != null ) {
d. result = encodeThrowable ( x, r) ;
return d;
}
r = null ;
}
try {
if ( e != null ) {
e. execute ( new UniApply < T , V > ( null , d, this , f) ) ;
} else {
@SuppressWarnings ( "unchecked" ) T t = ( T ) r;
d. result = d. encodeValue ( f. apply ( t) ) ;
}
} catch ( Throwable ex) {
d. result = encodeThrowable ( ex) ;
}
return d;
}
如果e != null表示是thenApplyAsync,需要调用ForkJoinPool的execute方法,该方法:
public void execute ( Runnable task) {
if ( task == null )
throw new NullPointerException ( ) ;
ForkJoinTask < ? > job;
if ( task instanceof ForkJoinTask < ? > )
job = ( ForkJoinTask < ? > ) task;
else
job = new ForkJoinTask. RunnableExecuteAction ( task) ;
externalSubmit ( job) ;
}
private < T > ForkJoinTask < T > externalSubmit ( ForkJoinTask < T > task) {
Thread t; ForkJoinWorkerThread w; WorkQueue q;
if ( task == null )
throw new NullPointerException ( ) ;
if ( ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) &&
( w = ( ForkJoinWorkerThread ) t) . pool == this &&
( q = w. workQueue) != null )
q. push ( task) ;
else
externalPush ( task) ;
return task;
}
通过上面的代码可以看到:
1:如果前置任务没有完成,即a.result=null,thenApply和thenApplyAsync都会将当前任务的下一个任务 入栈,然后再出栈执行
2:只有在当前任务已经完成的情况下,thenApply才会立即执行,不会入栈,再出栈,不会交给ForkJoinPool,thenApplyAsync还是将下一个任务封装为ForkJoinTask,入栈,之后出栈再执行,保证有顺序(应该是满足多线程的),注意:这里之前的清空返回值的说明还是存在的
同理,thenRun与thenRunAsync、thenAccept与thenAcceptAsync的区别与此类似
任务的网状执行:有向无环图:
如果任务只是链式执行,便不需要在每个CompletableFuture里面设1个栈了,用1个指针使所有任务组成链表 即可
但实际上,任务不(只)是链式执行,而是网状执行,组成 1 张图,如下图所示,所有任务组成一个有向无环图(有方向,但是没有有成环的):
任务一执行完成之后,任务二、任务三可以并行(前面有回答这个,不是真的并行哦,后面的并行说明还是正常并行的,因为多次执行的结果会有不一样的),在代码层面可以写为:future1.thenApply(任务二),future1.thenApply(任务三),不是逗号,那么
任务四在任务二执行完成时可开始执行
任务五要等待任务二、任务三都执行完成,才能开始,这里是AND关系(即相当于前面的allOf)
任务六在任务三执行完成时可以开始执行
对于任务七,只要任务四、任务五、任务六中任意一个任务结束,就可以开始执行
总而言之,任务之间是多对多的关系:1个任务有n个依赖它的后继任务(任务一的任务二和任务三),1个任务也有n个它依赖的前驱任务(任务五的任务二和任务三)
那么我们一般在前面学习时,有过AND关系(与),以及单纯的指向关系(链式),那么后继的有没有出现呢,答:实际上后继的就是OR关系,即有过案例了(或)
这样一个有向无环图,用什么样的数据结构表达呢?AND和OR的关系又如何表达呢?
有⼏个关键点:
1:在每个任务的返回值里面,存储了依赖它的接下来要执行的任务(栈),所以在上图中,任务一的CompletableFuture的栈中存储了任务二、任务三,而任务二的CompletableFuutre中存储了任务四、任务 五,任务三的CompletableFuture中存储了任务五、任务六,即每个任务的CompletableFuture对象的栈 里面,其实存储了该节点的出边对应的任务集合(前提是放入了栈),但是一般是会存储的,因为执行需要的时间通常大于判断时间(所以一般必然是判断的,即会放入栈)
2:任务二、任务三的CompletableFuture里面,都存储了任务五,那么任务五是不是会被触发两次,执行两 次呢? 任务五的确会被触发二次,但它会判断任务二、任务三的结果是不是都完成,如果只完成其中一个,它就 不会执行(具体我们并没有说明AND关系的底层原理,所以了解即可)
3:任务七存在于任务四、任务五、任务六的CompletableFuture的栈里面,因此会被触发三次,但它只会执 行一次,只要其中1个任务执行完成,就可以执行任务七了(即OR,我们也没有说明,所以了解即可)
4:正因为有AND和OR两种不同的关系,因此对应BiApply和OrApply两个对象,这两个对象的构造方法⼏乎 一样,只是在内部执行的时候,一个是AND的逻辑,一个是OR的逻辑(你可以在CompletableFutureDemo11中的run方法里面打印System.out.println(Thread.currentThread());来多次的执行(Thread.currentThread()可以获取当前线程的引用),就可以发现是不同的,即的确是并行的,所以由于是并行的(无论and还是or都是并行的,这里也可也在CompletableFutureDemo12中进行测试),那么我们的确只需要一个有返回值就行了)
static final class BiApply < T , U , V > extends BiCompletion < T , U , V > {
BiFunction < ? super T , ? super U , ? extends V > fn;
BiApply ( Executor executor, CompletableFuture < V > dep,
CompletableFuture < T > src, CompletableFuture < U > snd,
BiFunction < ? super T , ? super U , ? extends V > fn) {
super ( executor, dep, src, snd) ; this . fn = fn;
}
static final class OrApply < T , U extends T , V > extends BiCompletion < T , U , V > {
Function < ? super T , ? extends V > fn;
OrApply ( Executor executor, CompletableFuture < V > dep,
CompletableFuture < T > src, CompletableFuture < U > snd,
Function < ? super T , ? extends V > fn) {
super ( executor, dep, src, snd) ; this . fn = fn;
}
所以我们其实也可也出现这样:
BiApply和OrApply都是二元操作符,也就是说,只能传入二个被依赖的任务,但上面的任务七同时依赖 于任务四、任务五、任务六,这怎么处理呢?
任何一个多元操作,都能被转换为多个二元操作的叠加,如上图所示,假如任务一AND任务二AND任务 三 ==> 任务四,那么它可以被转换为右边的形式,新建了一个AND任务,这个AND任务(上面也成为任务一了)和任务三再作为 参数,构造任务四,OR的关系,与此类似,当然,前面说明的anyOf和allOf的任务基本也是这样,而正是因为可能会分成(必然)有两个一起,所以你在多次执行时,会出现不同(上面说明的在CompletableFutureDemo11或者CompletableFutureDemo12中加上对应的打印,在结果上,基本只是两个变化而已,看打印就知道了,必然前一个是3,后一个是5,那么基本只是他们直接变化,后面的组合也是如此,大体顺序不变)
此时,thenCombine的内部实现原理也就可以解释了,thenCombine用于任务一、任务二执行完成,再执行 任务三这样的操作,前面没有这样说明,这里补充一下,其中无论是and还是or他们都是并行的,但是thenCombine还是有顺序的,因为他没有操作and或者or
allOf内部的计算图分析(and):
下面以allOf方法为例,看一下有向无环计算图的内部运作过程:
public static CompletableFuture < Void > allOf ( CompletableFuture < ? > . . . cfs) {
return andTree ( cfs, 0 , cfs. length - 1 ) ;
}
static CompletableFuture < Void > andTree ( CompletableFuture < ? > [ ] cfs,
int lo, int hi) {
CompletableFuture < Void > d = new CompletableFuture < Void > ( ) ;
if ( lo > hi)
d. result = NIL ;
else {
CompletableFuture < ? > a, b; Object r, s, z; Throwable x;
int mid = ( lo + hi) >>> 1 ;
if ( ( a = ( lo == mid ? cfs[ lo] :
andTree ( cfs, lo, mid) ) ) == null ||
( b = ( lo == hi ? a : ( hi == mid+ 1 ) ? cfs[ hi] :
andTree ( cfs, mid+ 1 , hi) ) ) == null )
throw new NullPointerException ( ) ;
if ( ( r = a. result) == null || ( s = b. result) == null )
a. bipush ( b, new BiRelay < > ( d, a, b) ) ;
else if ( ( r instanceof AltResult
&& ( x = ( ( AltResult ) ( z = r) ) . ex) != null ) ||
( s instanceof AltResult
&& ( x = ( ( AltResult ) ( z = s) ) . ex) != null ) )
d. result = encodeThrowable ( x, z) ;
else
d. result = NIL ;
}
return d;
}
上面的方法是一个递归方法,输入是一个CompletableFuture对象的列表,输出是一个具有AND关系的复合CompletableFuture对象
最关键的代码:如上面加注释部分所示,因为d要等a,b都执行完成之后才能执行,因此d会被分别压入a,b所 在的栈中,通过这里的说明,我们可以得到一个原理,前面的返回值只要存在你的出栈完毕,那么就会赋值(最后操作的得到,上面就是b,所以b是返回的对应对象,自然得到的是他的返回值结果,虽然get是等待获取,但是实际上内部他们也是需要等待获取的,只是get是我们手动的获取的,而他们是直接的获取(T类型那里,这是因为不是最后一个),可以说get是一个简单的T类型操作,虽然这里没有说明),否则不会赋值,而出栈完毕,就是代表你的d全部出栈完毕
final void bipush ( CompletableFuture < ? > b, BiCompletion < ? , ? , ? > c) {
if ( c != null ) {
while ( result == null ) {
if ( tryPushStack ( c) ) {
if ( b. result == null )
b. unipush ( new CoCompletion ( c) ) ;
else if ( result != null )
c. tryFire ( SYNC ) ;
return ;
}
}
b. unipush ( c) ;
}
}
final void unipush ( Completion c) {
if ( c != null ) {
while ( ! tryPushStack ( c) ) {
if ( result != null ) {
NEXT . set ( c, null ) ;
break ;
}
}
if ( result != null )
c. tryFire ( SYNC ) ;
}
}
下图为allOf内部的运作过程,假设allof的参数传入了future1、future2、future3、future4(我们的案例中就是数组的形式,因为是可变长参数,我们看出一维数组即可),则对应四个原始 任务
⽣成BiRelay1、BiRelay2任务(固定的类任务,之前的也是对应的内部类操作,每个参数一个内部类任务提交,只是这里是多个),分别压入future1/future2、future3/future4的栈中,无论future1或future2完成,都会触发BiRelay1,无论future3或future4完成,都会触发BiRelay2,得到其中一个返回值操作
⽣成BiRelay3任务,压入future5/future6的栈中,无论future5或future6完成,都会触发BiRelay3任务
BiRelay只是一个中转任务,它本身没有任务代码,只是参照输入的两个future是否完成,如果完成,就从自己 的栈中弹出依赖它的BiRelay任务,然后执行
ForkJoinPool:
ForkJoinPool用法:
ForkJoinPool就是JDK7提供的一种"分治算法"(思想)的多线程并行计算框架,Fork意为分叉,Join意为合并,一分一 合,相互配合,形成分治算法(思想),此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,多个线程并行计 算
相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率,所以之前的都是使用ForkJoinPool来完成任务的处理,虽然ThreadPoolExecutor和ForkJoinPool的最终父类都是Executor接口,即基本都是直接继承AbstractExecutorService到继承ExecutorService到实现Executor
假设有5个任务,在ThreadPoolExecutor中有5个线程(核心为5)并行执行,其中一个任务的计算量很大,其余4个任务的 计算量很小,这会导致1个线程很忙,其他4个线程则容易处于空闲状态(核心线程是存在且没有销毁的)
利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计 算的负载均衡
例子1:快排
快排有2个步骤:
1:利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组里面的元素 比该元素大
2:对左右的两个子数组分别排序
左右两个子数组相互独立可以并行计算,利用ForkJoinPool,代码如下:
package main7 ;
import java. util. Arrays ;
import java. util. concurrent. * ;
public class ForkJoinPoolDemo01 {
static class SortTask extends RecursiveAction {
final long [ ] array;
final int lo;
final int hi;
public SortTask ( long [ ] array) {
this . array = array;
this . lo = 0 ;
this . hi = array. length - 1 ;
}
public SortTask ( long [ ] array, int lo, int hi) {
this . array = array;
this . lo = lo;
this . hi = hi;
}
private int partition ( long [ ] array, int lo, int hi) {
long x = array[ hi] ;
int i = lo - 1 ;
for ( int j = lo; j < hi; j++ ) {
if ( array[ j] <= x) {
i++ ;
swap ( array, i, j) ;
}
}
swap ( array, i + 1 , hi) ;
return i + 1 ;
}
private void swap ( long [ ] array, int i, int j) {
if ( i != j) {
long temp = array[ i] ;
array[ i] = array[ j] ;
array[ j] = temp;
}
}
@Override
protected void compute ( ) {
if ( lo < hi) {
int pivot = partition ( array, lo, hi) ;
SortTask left = new SortTask ( array, lo, pivot - 1 ) ;
SortTask right = new SortTask ( array, pivot + 1 , hi) ;
Object fork = left. fork ( ) ;
Object fork1 = right. fork ( ) ;
System . out. println ( fork) ;
System . out. println ( fork1) ;
Object join = left. join ( ) ;
Object join1 = right. join ( ) ;
System . out. println ( join) ;
System . out. println ( join1) ;
}
}
}
public static void main ( String [ ] args) throws InterruptedException , ExecutionException {
long [ ] array = { 5 , 3 , 7 , 9 , 2 , 4 , 1 , 8 , 10 } ;
ForkJoinTask sort = new SortTask ( array) ;
ForkJoinPool pool = new ForkJoinPool ( ) ;
ForkJoinTask submit = pool. submit ( sort) ;
System . out. println ( submit) ;
System . out. println ( 2 ) ;
Object o = submit. get ( ) ;
System . out. println ( o+ "1" ) ;
pool. shutdown ( ) ;
pool. awaitTermination ( 10 , TimeUnit . SECONDS ) ;
System . out. println ( Arrays . toString ( array) ) ;
}
}
至此大致说明完毕,简单来说就是多个线程来操作对应的二分,而提高效率,而不是必须一步一步走,使得可能需要等待一边数组操作完毕才可操作另外一边
例子2:求1到n个数的和
package main7 ;
import java. util. concurrent. ExecutionException ;
import java. util. concurrent. ForkJoinPool ;
import java. util. concurrent. ForkJoinTask ;
import java. util. concurrent. RecursiveTask ;
public class ForkJoinPoolDemo02 {
static class SumTask extends RecursiveTask < Long > {
private static final int THRESHOLD = 10 ;
private long start;
private long end;
public SumTask ( long n) {
this ( 1 , n) ;
}
public SumTask ( long start, long end) {
this . start = start;
this . end = end;
}
@Override
protected Long compute ( ) {
long sum = 0 ;
if ( ( end - start) <= THRESHOLD ) {
for ( long l = start; l <= end; l++ ) {
sum += l;
}
} else {
long mid = ( start + end) >>> 1 ;
SumTask left = new SumTask ( start, mid) ;
SumTask right = new SumTask ( mid + 1 , end) ;
left. fork ( ) ;
right. fork ( ) ;
Long join = left. join ( ) ;
sum = left. join ( ) + right. join ( ) ;
}
try {
Thread . sleep ( 1000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
return sum;
}
}
public static void main ( String [ ] args) throws ExecutionException , InterruptedException {
SumTask sum = new SumTask ( 100 ) ;
ForkJoinPool pool = new ForkJoinPool ( ) ;
ForkJoinTask < Long > future = pool. submit ( sum) ;
Long aLong = future. get ( ) ;
System . out. println ( aLong) ;
pool. shutdown ( ) ;
}
}
所以可以发现,只要是可以进行分开的,基本都可以利用ForkJoinPool来进行优化,没有返回值也行,有返回值也行,都可以,虽然线程池也可以,但是他是封装好的,而其他线程池只是给出多个线程,但他们之间的联系没有给出,或者说只是操作任务而不会去自动的操作分开的任务,所以在这种情况下,一般我们都会用ForkJoinPool来进行优化,上面案例中大致指定流程即可,可能有细节问题,但这里就不多说了,其中join虽然是阻塞的,但是任务还是分开执行的,他只是使得当前线程不会执行完毕或者说等待任务返回结果而已,后面会说明的
上面的代码用到了 RecursiveAction 和 RecursiveTask 两个类,它们都继承自抽象类ForkJoinTask,用到了其 中关键的接口 fork()、join(),二者的区别是一个有返回值,一个没有返回值(RecursiveTask 里面有这个变量V result;,并且对应的方法有返回值,特别是compute方法,而不是RecursiveAction类的compute方法的空返回值,所以要操作返回值时,记得实现RecursiveTask)
public abstract class RecursiveAction extends ForkJoinTask < Void > {
public abstract class RecursiveTask < V > extends ForkJoinTask < V > {
RecursiveAction/RecursiveTask类继承关系:
白色实线箭头继承关系,白色虚线箭头实现关系
在ForkJoinPool中,对应参数的接口或者类如下:
public < T > ForkJoinTask < T > submit ( ForkJoinTask < T > task) {
return externalSubmit ( task) ;
}
核⼼数据结构:
与ThreadPoolExector不同的是,除一个全局的任务队列之外(直接调用的任务,上面的例子基本都是一个全局,多个局部),每个线程还有一个自己的局部队列(使得线程的确可以操作分开的任务,一般来说局部完成后,才操作全局)
核⼼数据结构如下所示:
public class ForkJoinPool extends AbstractExecutorService {
volatile long ctl;
WorkQueue [ ] workQueues;
final ForkJoinWorkerThreadFactory factory;
int indexSeed;
static final class WorkQueue {
volatile int source;
int id;
int base;
int top;
volatile int phase;
int stackPred;
int nsteals;
ForkJoinTask < ? > [ ] array;
final ForkJoinPool pool;
final ForkJoinWorkerThread owner;
}
}
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool;
final ForkJoinPool. WorkQueue workQueue;
}
下面看一下这些核⼼数据结构的构造过程
public ForkJoinPool ( ) {
this ( Math . min ( MAX_CAP , Runtime . getRuntime ( ) . availableProcessors ( ) ) ,
defaultForkJoinWorkerThreadFactory, null , false ,
0 , MAX_CAP , 1 , null , DEFAULT_KEEPALIVE , TimeUnit . MILLISECONDS ) ;
}
public ForkJoinPool ( int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode,
int corePoolSize,
int maximumPoolSize,
int minimumRunnable,
Predicate < ? super ForkJoinPool > saturate,
long keepAliveTime,
TimeUnit unit) {
if ( parallelism <= 0 || parallelism > MAX_CAP ||
maximumPoolSize < parallelism || keepAliveTime <= 0L )
throw new IllegalArgumentException ( ) ;
if ( factory == null )
throw new NullPointerException ( ) ;
long ms = Math . max ( unit. toMillis ( keepAliveTime) , TIMEOUT_SLOP ) ;
int corep = Math . min ( Math . max ( corePoolSize, parallelism) , MAX_CAP ) ;
long c = ( ( ( ( long ) ( - corep) << TC_SHIFT ) & TC_MASK ) |
( ( ( long ) ( - parallelism) << RC_SHIFT ) & RC_MASK ) ) ;
int m = parallelism | ( asyncMode ? FIFO : 0 ) ;
int maxSpares = Math . min ( maximumPoolSize, MAX_CAP ) - parallelism;
int minAvail = Math . min ( Math . max ( minimumRunnable, 0 ) , MAX_CAP ) ;
int b = ( ( minAvail - parallelism) & SMASK ) | ( maxSpares << SWIDTH ) ;
int n = ( parallelism > 1 ) ? parallelism - 1 : 1 ;
n |= n >>> 1 ; n |= n >>> 2 ; n |= n >>> 4 ; n |= n >>> 8 ; n |= n >>> 16 ;
n = ( n + 1 ) << 1 ;
this . workerNamePrefix = "ForkJoinPool-" + nextPoolId ( ) + "-worker-" ;
this . workQueues = new WorkQueue [ n] ;
this . factory = factory;
this . ueh = handler;
this . saturate = saturate;
this . keepAlive = ms;
this . bounds = b;
this . mode = m;
this . ctl = c;
checkPermission ( ) ;
}
工作窃取队列:
关于上面的全局队列,有一个关键点需要说明:它并⾮使用BlockingQueue,而是基于一个普通的数组得以实现
这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务,在 ForkJoinPool开篇的注释中,Doug Lea 特别提到了工作窃取队列的实现,其陈述来自如下两篇论文:"Dynamic Circular Work-Stealing Deque" by Chase and Lev,SPAA 2005与 " Idempotent work stealing" by Michael,Saraswat,and Vechev,PPoPP 2009",你可以在网上查阅相应论文(虽然这个资料也是网上找的,大多数博客也是这样说的)
所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任 务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙(运气不好,都被他拿到了,可以认为是CAS的抢占),这个过程要用到工作窃取队列,认为是继续操作任务(就如前面案例中线程操作分开的一样,只是该线程不是新的,而是原来的核心线程,因为我们认为对应的ForkJoinPool是类似于线程池的,所以相关作用基本相同,只是他的线程可以操作分开的任务)
这个队列只有如下⼏个操作:
1:Worker线程自己,在队列头部,通过对top指针执行加、减操作,实现入队或出队,这是单线程的(只是对该操作而言,他里面有锁,所以不需要CAS,前面说明对应的线程池时基本操作锁了)
2:其他Worker线程,在队列尾部,通过对base进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过CAS操作(多个人窃取)
这个队列,在Dynamic Circular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array,之所以这样命 名,是因为有两个关键点:
1:整个队列是环形的,也就是一个数组实现的RingBuffer,并且base会一直累加,不会减小,top会累加或者减小,所以最后base、top的总值基本都会大于整个数组的长度,只是计算数组下标的时候(top是可以为0的),会取top&(queue.length-1),base&(queue.length-1),因为queue.length是2的整数次方,这里也就是对queue.length进行取模操作,当top-base=queue.length-1 的时候,队列为满(因为base是出队的,为加,而top入队为加),此时需要扩容,当top=base的时候,队列为空(即都出队了),Worker线程即将进入阻塞状态(核心的空闲)
2:当队列满了之后会扩容,且被称为是动态的,即动态扩容,但这就涉及一个棘⼿的问题:多个线程同时在读写这个队 列,如何实现在不加锁的情况下一边读写、一边扩容呢?,虽然说不加锁,但是基本还是利用锁的思想或者加上了乐观锁,所以一般来说不加的锁是说明不加悲观锁的操作的
这是大多数多线程扩容时要面临的问题,每个框架或者其他代码都有对应自己的解决方式(根据自己的作用来进行解决的),这里通过分析工作窃取队列的特性,我们会发现:在 base 一端,是多线程访问的,但它们只会使base变大,也就 是使队列中的元素变少,所以队列为满,一定发⽣在top一端,对top进行累加的时候,这一端却是单线程的,队列 的扩容恰好利用了这个单线程的特性,即在扩容过程中,不可能有其他线程对top进行修改,只有线程对base进行 修改!
下图为工作窃取队列扩容示意图,扩容之后,数组长度变成之前的二倍,但top、base的值是不变的,通过top、base对新的数组长度取模,仍然可以定位到元素(任务)在新数组中的位置
从上面可以看出,进行了扩容(创建新的队列),由于单线程这个地方不会有人进入,即不会影响扩容,且后面的扩容也利用了CAS,而之所以利用CAS是保证没有base不会影响,使得移动过去,所以综上所述,base和top都不会影响扩容
下面结合WorkQueue扩容的代码进一步分析:
public < T > ForkJoinTask < T > submit ( ForkJoinTask < T > task) {
return externalSubmit ( task) ;
}
private < T > ForkJoinTask < T > externalSubmit ( ForkJoinTask < T > task) {
Thread t; ForkJoinWorkerThread w; WorkQueue q;
if ( task == null )
throw new NullPointerException ( ) ;
if ( ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) &&
( w = ( ForkJoinWorkerThread ) t) . pool == this &&
( q = w. workQueue) != null )
q. push ( task) ;
else
externalPush ( task) ;
return task;
}
final void push ( ForkJoinTask < ? > task) {
ForkJoinTask < ? > [ ] a;
int s = top, d, cap, m;
ForkJoinPool p = pool;
if ( ( a = array) != null && ( cap = a. length) > 0 ) {
QA . setRelease ( a, ( m = cap - 1 ) & s, task) ;
top = s + 1 ;
if ( ( ( d = s - ( int ) BASE . getAcquire ( this ) ) & ~ 1 ) == 0 &&
p != null ) {
VarHandle . fullFence ( ) ;
p. signalWork ( ) ;
}
else if ( d == m)
growArray ( false ) ;
}
}
final void growArray ( boolean locked) {
ForkJoinTask < ? > [ ] newA = null ;
try {
ForkJoinTask < ? > [ ] oldA; int oldSize, newSize;
if ( ( oldA = array) != null && ( oldSize = oldA. length) > 0 &&
( newSize = oldSize << 1 ) <= MAXIMUM_QUEUE_CAPACITY &&
newSize > 0 ) {
try {
newA = new ForkJoinTask < ? > [ newSize] ;
} catch ( OutOfMemoryError ex) {
}
if ( newA != null ) {
int oldMask = oldSize - 1 , newMask = newSize - 1 ;
for ( int s = top - 1 , k = oldMask; k >= 0 ; -- k) {
ForkJoinTask < ? > x = ( ForkJoinTask < ? > )
QA . getAndSet ( oldA, s & oldMask, null ) ;
if ( x != null )
newA[ s-- & newMask] = x;
else
break ;
}
array = newA;
VarHandle . releaseFence ( ) ;
}
}
} finally {
if ( locked)
phase = 0 ;
}
if ( newA == null )
throw new RejectedExecutionException ( "Queue capacity exceeded" ) ;
}
ForkJoinPool状态控制:
状态变量ctl解析:
类似于ThreadPoolExecutor,在ForkJoinPool中也有一个ctl变量负责表达ForkJoinPool的整个⽣命周期和相关的各种状态,不过这里的ctl变量更加复杂,是一个long型变量,代码如下所示
public class ForkJoinPool extends AbstractExecutorService {
volatile long ctl;
private static final long SP_MASK = 0 xffffffffL;
private static final long UC_MASK = ~ SP_MASK ;
private static final int RC_SHIFT = 48 ;
private static final long RC_UNIT = 0 x0001L << RC_SHIFT ;
private static final long RC_MASK = 0 xffffL << RC_SHIFT ;
private static final int TC_SHIFT = 32 ;
private static final long TC_UNIT = 0 x0001L << TC_SHIFT ;
private static final long TC_MASK = 0 xffffL << TC_SHIFT ;
private static final long ADD_WORKER = 0 x0001L << ( TC_SHIFT + 15 ) ;
}
public ForkJoinPool ( int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode,
int corePoolSize,
int maximumPoolSize,
int minimumRunnable,
Predicate < ? super ForkJoinPool > saturate,
long keepAliveTime,
TimeUnit unit) {
if ( parallelism <= 0 || parallelism > MAX_CAP ||
maximumPoolSize < parallelism || keepAliveTime <= 0L )
throw new IllegalArgumentException ( ) ;
if ( factory == null )
throw new NullPointerException ( ) ;
long ms = Math . max ( unit. toMillis ( keepAliveTime) , TIMEOUT_SLOP ) ;
int corep = Math . min ( Math . max ( corePoolSize, parallelism) , MAX_CAP ) ;
long c = ( ( ( ( long ) ( - corep) << TC_SHIFT ) & TC_MASK ) |
( ( ( long ) ( - parallelism) << RC_SHIFT ) & RC_MASK ) ) ;
int m = parallelism | ( asyncMode ? FIFO : 0 ) ;
int maxSpares = Math . min ( maximumPoolSize, MAX_CAP ) - parallelism;
int minAvail = Math . min ( Math . max ( minimumRunnable, 0 ) , MAX_CAP ) ;
int b = ( ( minAvail - parallelism) & SMASK ) | ( maxSpares << SWIDTH ) ;
int n = ( parallelism > 1 ) ? parallelism - 1 : 1 ;
n |= n >>> 1 ; n |= n >>> 2 ; n |= n >>> 4 ; n |= n >>> 8 ; n |= n >>> 16 ;
n = ( n + 1 ) << 1 ;
this . workerNamePrefix = "ForkJoinPool-" + nextPoolId ( ) + "-worker-" ;
this . workQueues = new WorkQueue [ n] ;
this . factory = factory;
this . ueh = handler;
this . saturate = saturate;
this . keepAlive = ms;
this . bounds = b;
this . mode = m;
this . ctl = c;
checkPermission ( ) ;
}
ctl变量的64个比特位被分成五部分:
1:AC:最高的16个比特位,表示Active线程数-parallelism,parallelism是上面的构造方法传进去的参数
2:TC:次高的16个比特位,表示Total线程数-parallelism
3:ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭
4:EC:15个比特位,表示阻塞栈的栈顶线程的wait count(关于什么是wait count,接下来解释)
5:ID:16个比特位,表示阻塞栈的栈顶线程对应的id
阻塞栈Treiber Stack(后面相关的知识只需要了解即可,因为只是大致说明,并没有很细节的说明 ):
什么叫阻塞栈呢?
要实现多个线程的阻塞、唤醒,除了park/unpark这一对操作原语,还需要一个无锁链表实现的阻塞队列,把 所有阻塞的线程串在一起
在ForkJoinPool中,没有使用阻塞队列,而是使用了阻塞栈,把所有空闲的Worker线程放在一个栈里面,这个 栈同样通过链表来实现,名为Treiber Stack,前面讲解Phaser的实现原理的时候,也用过这个数据结构
下图为所有阻塞的Worker线程组成的Treiber Stack
⾸先,WorkQueue有一个id变量,记录了自己在WorkQueue[]数组中的下标位置,id变量就相当于每个WorkQueue或ForkJoinWorkerThread对象的地址
WorkQueue [ ] workQueues;
int id;
其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个stackPred变量就 相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个Treiber Stack
最后,ctl变量的最低16位,记录了栈的栈顶线程的id,中间的15位,记录了栈顶线程被阻塞的次数,也称为wait count
是否感觉所有的id都是串起来的一样的,也的确是这样的
ctl变量的初始值:
构造方法中,有如下的代码:
long c = ( ( ( ( long ) ( - corep) << TC_SHIFT ) & TC_MASK ) |
( ( ( long ) ( - parallelism) << RC_SHIFT ) & RC_MASK ) ) ;
因为在初始的时候,ForkJoinPool 中的线程个数为 0,所以 AC=0-parallelism,TC=0-parallelism,这意味着 只有高32位的AC、TC 两个部分填充了值,低32位都是0填充
ForkJoinWorkerThread状态与个数分析:
public class ForkJoinWorkerThread extends Thread {
在ThreadPoolExecutor中,有corePoolSize和maxmiumPoolSize 两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数,那么,ForkJoinPool在实际的运 行过程中,线程数究竟是由哪些因素决定的呢?
要回答这个问题,先得明⽩ForkJoinPool中的线程都可能有哪⼏种状态?可能的状态有三种:
1:空闲状态(放在Treiber Stack里面)
2:活跃状态(正在执行某个ForkJoinTask,未阻塞)
3:阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join(当前线程调用新的线程,并等待新的线程返回值,因为join是等待调用结果的),等待另外一个任务的结果返回,即join实际上相当于get方法的阻塞,只是他是内部的,所以只是他是直接获取而已,而不是我们的手动获取,虽然在前面也有过类似的说明(T类型那个地方)),即有活跃那么自然有阻塞,他们是互相的,这里也说明了如果有多个join,可能前面的阻塞会导致他并不能立即得到结果,所以一般我们最好进行二分,否则效率并不会比单纯的一个线程执行要快很多,虽然也是快的,最差也是相等或者少一丢丢(代码很多),就如单线程是1,那么这里是[0.9,10]这个范围
ctl变量很好地反映出了三种状态:
高32位(没有说明是什么位的,那么代表是前面的32位,而说明的,可以说成两个低32位,前面就有说成"32位中 的两个低16位的地方"):u=(int) (ctl >>> 32),然后u又拆分成tc、ac 两个16位,这里当成高32位的值
低32位:c=(int) ctl,这里当成低32位的值
c>0,说明Treiber Stack不为空,有空闲线程,c=0,说明没有空闲线程
2:ac>0,说明有活跃线程,ac<=0,说明没有空闲线程,并且还未超出parallelism
3:tc>0,说明总线程数 >parallelism
在提交任务的时候:
public < T > ForkJoinTask < T > submit ( ForkJoinTask < T > task) {
return externalSubmit ( task) ;
}
private < T > ForkJoinTask < T > externalSubmit ( ForkJoinTask < T > task) {
Thread t; ForkJoinWorkerThread w; WorkQueue q;
if ( task == null )
throw new NullPointerException ( ) ;
if ( ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) &&
( w = ( ForkJoinWorkerThread ) t) . pool == this &&
( q = w. workQueue) != null )
q. push ( task) ;
else
externalPush ( task) ;
return task;
}
final void externalPush ( ForkJoinTask < ? > task) {
int r;
if ( ( r = ThreadLocalRandom . getProbe ( ) ) == 0 ) {
ThreadLocalRandom . localInit ( ) ;
r = ThreadLocalRandom . getProbe ( ) ;
}
for ( ; ; ) {
WorkQueue q;
int md = mode, n;
WorkQueue [ ] ws = workQueues;
if ( ( md & SHUTDOWN ) != 0 || ws == null || ( n = ws. length) <= 0 )
throw new RejectedExecutionException ( ) ;
else if ( ( q = ws[ ( n - 1 ) & r & SQMASK ] ) == null ) {
int qid = ( r | QUIET ) & ~ ( FIFO | OWNED ) ;
Object lock = workerNamePrefix;
ForkJoinTask < ? > [ ] qa =
new ForkJoinTask < ? > [ INITIAL_QUEUE_CAPACITY ] ;
q = new WorkQueue ( this , null ) ;
q. array = qa;
q. id = qid;
q. source = QUIET ;
if ( lock != null ) {
synchronized ( lock) {
WorkQueue [ ] vs; int i, vn;
if ( ( vs = workQueues) != null && ( vn = vs. length) > 0 &&
vs[ i = qid & ( vn - 1 ) & SQMASK ] == null )
vs[ i] = q;
}
}
}
else if ( ! q. tryLockPhase ( ) )
r = ThreadLocalRandom . advanceProbe ( r) ;
else {
if ( q. lockedPush ( task) )
signalWork ( ) ;
return ;
}
}
}
在通知工作线程的时候,需要判断ctl的状态,如果没有闲置的线程(一般没有线程也是一样会开启线程),则开启新线程:
final void signalWork ( ) {
for ( ; ; ) {
long c; int sp; WorkQueue [ ] ws; int i; WorkQueue v;
if ( ( c = ctl) >= 0L )
break ;
else if ( ( sp = ( int ) c) == 0 ) {
if ( ( c & ADD_WORKER ) != 0L )
tryAddWorker ( c) ;
break ;
}
else if ( ( ws = workQueues) == null )
break ;
else if ( ws. length <= ( i = sp & SMASK ) )
break ;
else if ( ( v = ws[ i] ) == null )
break ;
else {
int np = sp & ~ UNSIGNALLED ;
int vp = v. phase;
long nc = ( v. stackPred & SP_MASK ) | ( UC_MASK & ( c + RC_UNIT ) ) ;
Thread vt = v. owner;
if ( sp == vp && CTL . compareAndSet ( this , c, nc) ) {
v. phase = np;
if ( vt != null && v. source < 0 )
LockSupport . unpark ( vt) ;
break ;
}
}
}
}
Worker线程的阻塞-唤醒机制:
ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制来操作线程之间的唤醒或者阻塞,而是利用了park/unpark原语,并 自行实现了Treiber Stack(队列一般还是阻塞队列的)
下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别是如何入栈的
阻塞–入栈:
当一个线程窃取不到任何任务,也就是处于空闲状态时就会阻塞入栈(之所以要阻塞栈,是要完成线程之间的唤醒,前面说明了阻塞栈的定义,并且也为了唤醒并得到join的对应返回值的成功的操作)
public void run ( ) {
if ( workQueue. array == null ) {
Throwable exception = null ;
try {
onStart ( ) ;
pool. runWorker ( workQueue) ;
} catch ( Throwable ex) {
exception = ex;
} finally {
try {
onTermination ( exception) ;
} catch ( Throwable ex) {
if ( exception == null )
exception = ex;
} finally {
pool. deregisterWorker ( this , exception) ;
}
}
}
}
final void runWorker ( WorkQueue w) {
int r = ( w. id ^ ThreadLocalRandom . nextSecondarySeed ( ) ) | FIFO ;
w. array = new ForkJoinTask < ? > [ INITIAL_QUEUE_CAPACITY ] ;
for ( ; ; ) {
int phase;
if ( scan ( w, r) ) {
r ^= r << 13 ; r ^= r >>> 17 ; r ^= r << 5 ;
}
else if ( ( phase = w. phase) >= 0 ) {
long np = ( w. phase = ( phase + SS_SEQ ) | UNSIGNALLED ) & SP_MASK ;
long c, nc;
do {
w. stackPred = ( int ) ( c = ctl) ;
nc = ( ( c - RC_UNIT ) & UC_MASK ) | np;
} while ( ! CTL . weakCompareAndSet ( this , c, nc) ) ;
}
else {
int pred = w. stackPred;
Thread . interrupted ( ) ;
w. source = DORMANT ;
long c = ctl;
int md = mode, rc = ( md & SMASK ) + ( int ) ( c >> RC_SHIFT ) ;
if ( md < 0 )
break ;
else if ( rc <= 0 && ( md & SHUTDOWN ) != 0 &&
tryTerminate ( false , false ) )
break ;
else if ( rc <= 0 && pred != 0 && phase == ( int ) c) {
long nc = ( UC_MASK & ( c - TC_UNIT ) ) | ( SP_MASK & pred) ;
long d = keepAlive + System . currentTimeMillis ( ) ;
LockSupport . parkUntil ( this , d) ;
if ( ctl == c &&
d - System . currentTimeMillis ( ) <= TIMEOUT_SLOP &&
CTL . compareAndSet ( this , c, nc) ) {
w. phase = QUIET ;
break ;
}
}
else if ( w. phase < 0 )
LockSupport . park ( this ) ;
w. source = 0 ;
}
}
}
private boolean scan ( WorkQueue w, int r) {
WorkQueue [ ] ws; int n;
if ( ( ws = workQueues) != null && ( n = ws. length) > 0 && w != null ) {
for ( int m = n - 1 , j = r & m; ; ) {
WorkQueue q; int b;
if ( ( q = ws[ j] ) != null && q. top != ( b = q. base) ) {
int qid = q. id;
ForkJoinTask < ? > [ ] a; int cap, k; ForkJoinTask < ? > t;
if ( ( a = q. array) != null && ( cap = a. length) > 0 ) {
t = ( ForkJoinTask < ? > ) QA . getAcquire ( a, k = ( cap - 1 ) & b) ;
if ( q. base == b++ && t != null &&
QA . compareAndSet ( a, k, t, null ) ) {
q. base = b;
w. source = qid;
if ( q. top - b > 0 )
signalWork ( ) ;
w. topLevelExec ( t, q,
r & ( ( n << TOP_BOUND_SHIFT ) - 1 ) ) ;
}
}
return true ;
}
else if ( -- n > 0 )
j = ( j + 1 ) & m;
else
break ;
}
}
return false ;
}
唤醒–出栈:
在新的任务到来之后,空闲的线程被唤醒,其核⼼逻辑在signalWork方法里面
final void signalWork ( ) {
for ( ; ; ) {
long c; int sp; WorkQueue [ ] ws; int i; WorkQueue v;
if ( ( c = ctl) >= 0L )
break ;
else if ( ( sp = ( int ) c) == 0 ) {
if ( ( c & ADD_WORKER ) != 0L )
tryAddWorker ( c) ;
break ;
}
else if ( ( ws = workQueues) == null )
break ;
else if ( ws. length <= ( i = sp & SMASK ) )
break ;
else if ( ( v = ws[ i] ) == null )
break ;
else {
int np = sp & ~ UNSIGNALLED ;
int vp = v. phase;
long nc = ( v. stackPred & SP_MASK ) | ( UC_MASK & ( c + RC_UNIT ) ) ;
Thread vt = v. owner;
if ( sp == vp && CTL . compareAndSet ( this , c, nc) ) {
v. phase = np;
if ( vt != null && v. source < 0 )
LockSupport . unpark ( vt) ;
break ;
}
}
}
}
任务的提交过程分析:
在明⽩了工作窃取队列、ctl变量的各种状态、Worker的各种状态,以及线程阻塞—唤醒机制之后,接下来综 合这些知识,详细分析任务的提交和执行过程
关于任务的提交,ForkJoinPool最外层的接口如下所示
public < T > ForkJoinTask < T > submit ( ForkJoinTask < T > task) {
return externalSubmit ( task) ;
}
private < T > ForkJoinTask < T > externalSubmit ( ForkJoinTask < T > task) {
Thread t; ForkJoinWorkerThread w; WorkQueue q;
if ( task == null )
throw new NullPointerException ( ) ;
if ( ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) &&
( w = ( ForkJoinWorkerThread ) t) . pool == this &&
( q = w. workQueue) != null )
q. push ( task) ;
else
externalPush ( task) ;
return task;
}
如何区分一个任务是内部任务,还是外部任务呢?
可以通过调用该方法的线程类型判断
如果线程类型是ForkJoinWorkerThread,说明是线程池内部的某个线程在调用该方法,则把该任务放入该线 程的局部队列(简单来说就是由线程开启的线程才会放入局部,而这样开启的线程,是ForkJoinWorkerThread类型的,而不是我们的Thread类型,虽然ForkJoinWorkerThread继承Thread
否则,是外部线程在调用该方法,则将该任务加入全局队列,注意,我们说的阻塞队列是对线程的任务保存的,而线程本身是自己保存的,我们只需要拿取即可,就如连接池一样,之所以可以空闲线程入栈,是因为除了在运行中可以被赋值外(他只是针对线程来说的,赋值不会改变线程),更重要的是入栈可以操作唤醒(可以赋值,但是为了保证可以操作返回值,我们需要对线程之间进行唤醒,而不是任务唤醒线程,即需要多个(另外一个)唤醒操作了)
这里也要注意:队列是有顺序的,也就是说任务是有顺序的给,只是线程多了而已,虽然线程有线程集合,但是他们之间是抢占,虽然并没有具体说明,这里了解即可
内部提交任务push:
内部提交任务,即上面的q.push(task),会放入该线程的工作窃取队列中,代码如下所示
final void push ( ForkJoinTask < ? > task) {
ForkJoinTask < ? > [ ] a;
int s = top, d, cap, m;
ForkJoinPool p = pool;
if ( ( a = array) != null && ( cap = a. length) > 0 ) {
QA . setRelease ( a, ( m = cap - 1 ) & s, task) ;
top = s + 1 ;
if ( ( ( d = s - ( int ) BASE . getAcquire ( this ) ) & ~ 1 ) == 0 &&
p != null ) {
VarHandle . fullFence ( ) ;
p. signalWork ( ) ;
}
else if ( d == m)
growArray ( false ) ;
}
}
由于工作窃取队列的特性,操作是单线程的,所以此处不需要执行CAS操作
外部提交任务:
final void externalPush ( ForkJoinTask < ? > task) {
int r;
if ( ( r = ThreadLocalRandom . getProbe ( ) ) == 0 ) {
ThreadLocalRandom . localInit ( ) ;
r = ThreadLocalRandom . getProbe ( ) ;
}
for ( ; ; ) {
WorkQueue q;
int md = mode, n;
WorkQueue [ ] ws = workQueues;
if ( ( md & SHUTDOWN ) != 0 || ws == null || ( n = ws. length) <= 0 )
throw new RejectedExecutionException ( ) ;
else if ( ( q = ws[ ( n - 1 ) & r & SQMASK ] ) == null ) {
int qid = ( r | QUIET ) & ~ ( FIFO | OWNED ) ;
Object lock = workerNamePrefix;
ForkJoinTask < ? > [ ] qa =
new ForkJoinTask < ? > [ INITIAL_QUEUE_CAPACITY ] ;
q = new WorkQueue ( this , null ) ;
q. array = qa;
q. id = qid;
q. source = QUIET ;
if ( lock != null ) {
synchronized ( lock) {
WorkQueue [ ] vs; int i, vn;
if ( ( vs = workQueues) != null && ( vn = vs. length) > 0 &&
vs[ i = qid & ( vn - 1 ) & SQMASK ] == null )
vs[ i] = q;
}
}
}
else if ( ! q. tryLockPhase ( ) )
r = ThreadLocalRandom . advanceProbe ( r) ;
else {
if ( q. lockedPush ( task) )
signalWork ( ) ;
return ;
}
}
}
lockedPush(task)方法的实现:
final boolean lockedPush ( ForkJoinTask < ? > task) {
ForkJoinTask < ? > [ ] a;
boolean signal = false ;
int s = top, b = base, cap, d;
if ( ( a = array) != null && ( cap = a. length) > 0 ) {
a[ ( cap - 1 ) & s] = task;
top = s + 1 ;
if ( b - s + cap - 1 == 0 )
growArray ( true ) ;
else {
phase = 0 ;
if ( ( ( s - base) & ~ 1 ) == 0 )
signal = true ;
}
}
return signal;
}
外部多个线程会调用该方法,所以要加锁,入队列和扩容的逻辑和线程内部的队列基本相同,最后,调用signalWork(),通知一个空闲线程来取
工作窃取算法:任务的执行过程分析:
全局队列有任务,局部队列也有任务,每一个Worker线程都会不间断地扫描这些队列,窃取任务来执行(也就是说局部队列也会进行窃取,但是可能也只是由对应的一个线程执行完毕,但是一般我认为是窃取的),下 面从Worker线程的run方法开始分析:
public void run ( ) {
if ( workQueue. array == null ) {
Throwable exception = null ;
try {
onStart ( ) ;
pool. runWorker ( workQueue) ;
} catch ( Throwable ex) {
exception = ex;
} finally {
try {
onTermination ( exception) ;
} catch ( Throwable ex) {
if ( exception == null )
exception = ex;
} finally {
pool. deregisterWorker ( this , exception) ;
}
}
}
}
run()方法调用的是所在ForkJoinPool的runWorker方法,如下所示
final void runWorker ( WorkQueue w) {
int r = ( w. id ^ ThreadLocalRandom . nextSecondarySeed ( ) ) | FIFO ;
w. array = new ForkJoinTask < ? > [ INITIAL_QUEUE_CAPACITY ] ;
for ( ; ; ) {
int phase;
if ( scan ( w, r) ) {
r ^= r << 13 ; r ^= r >>> 17 ; r ^= r << 5 ;
}
else if ( ( phase = w. phase) >= 0 ) {
long np = ( w. phase = ( phase + SS_SEQ ) | UNSIGNALLED ) & SP_MASK ;
long c, nc;
do {
w. stackPred = ( int ) ( c = ctl) ;
nc = ( ( c - RC_UNIT ) & UC_MASK ) | np;
} while ( ! CTL . weakCompareAndSet ( this , c, nc) ) ;
}
else {
int pred = w. stackPred;
Thread . interrupted ( ) ;
w. source = DORMANT ;
long c = ctl;
int md = mode, rc = ( md & SMASK ) + ( int ) ( c >> RC_SHIFT ) ;
if ( md < 0 )
break ;
else if ( rc <= 0 && ( md & SHUTDOWN ) != 0 &&
tryTerminate ( false , false ) )
break ;
else if ( rc <= 0 && pred != 0 && phase == ( int ) c) {
long nc = ( UC_MASK & ( c - TC_UNIT ) ) | ( SP_MASK & pred) ;
long d = keepAlive + System . currentTimeMillis ( ) ;
LockSupport . parkUntil ( this , d) ;
if ( ctl == c &&
d - System . currentTimeMillis ( ) <= TIMEOUT_SLOP &&
CTL . compareAndSet ( this , c, nc) ) {
w. phase = QUIET ;
break ;
}
}
else if ( w. phase < 0 )
LockSupport . park ( this ) ;
w. source = 0 ;
}
}
}
下面详细看扫描过程scan(w, a):
private boolean scan ( WorkQueue w, int r) {
WorkQueue [ ] ws; int n;
if ( ( ws = workQueues) != null && ( n = ws. length) > 0 && w != null ) {
for ( int m = n - 1 , j = r & m; ; ) {
WorkQueue q; int b;
if ( ( q = ws[ j] ) != null && q. top != ( b = q. base) ) {
int qid = q. id;
ForkJoinTask < ? > [ ] a; int cap, k; ForkJoinTask < ? > t;
if ( ( a = q. array) != null && ( cap = a. length) > 0 ) {
t = ( ForkJoinTask < ? > ) QA . getAcquire ( a, k = ( cap - 1 ) & b) ;
if ( q. base == b++ && t != null &&
QA . compareAndSet ( a, k, t, null ) ) {
q. base = b;
w. source = qid;
if ( q. top - b > 0 )
signalWork ( ) ;
w. topLevelExec ( t, q,
r & ( ( n << TOP_BOUND_SHIFT ) - 1 ) ) ;
}
}
return true ;
}
else if ( -- n > 0 )
j = ( j + 1 ) & m;
else
break ;
}
}
return false ;
}
对应的方法前面说明过了,且看起来也去局部窃取了,即顶级包含所有
ForkJoinTask的fork/join:
如果局部队列、全局中的任务全部是相互独立的,就很简单了,但问题是,对于分治算法来说,分解出来的一 个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成
这种依赖关系是通过ForkJoinTask中的join()来体现的,且看前面的代码:
protected void compute ( ) {
if ( lo < hi) {
int pivot = partition ( array, lo, hi) ;
SortTask left = new SortTask ( array, lo, pivot - 1 ) ;
SortTask right = new SortTask ( array, pivot + 1 , hi) ;
left. fork ( ) ;
right. fork ( ) ;
left. join ( ) ;
right. join ( ) ;
}
}
线程在执行当前ForkJoinTask的时候,产⽣了left、right 两个子Task
fork是指把这两个子Task放入队列里面,也就是任务放在队列使得执行(局部队列,并且是对应的线程来放的,开启线程,有队列,然后执行,前面说明过大体意思"是先开启,然后队列,然后线程执行")
join则是要等待2个子Task完成
而子Task在执行过程中,会再次产⽣两个子Task,如此层层嵌套,类似于递归调用,直到最底层的Task计算 完成,再一级级返回
fork:
fork()的代码很简单,就是把自己放入当前线程所在的局部队列中
如果是外部线程调用fork方法,则直接将任务添加到共享队列中
public final ForkJoinTask < V > fork ( ) {
Thread t;
if ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread )
( ( ForkJoinWorkerThread ) t) . workQueue. push ( this ) ;
else
ForkJoinPool . common. externalPush ( this ) ;
return this ;
}
join的嵌套:
join的层层嵌套阻塞原理:
join会导致线程的层层嵌套阻塞,如图所示:
线程1在执行 ForkJoinTask1,在执行过程中调用了 forkJoinTask2.join(),所以要等ForkJoinTask2完成,线程1才能返回
线程2在执行ForkJoinTask2,但由于调用了forkJoinTask3.join(),只有等ForkJoinTask3完成后,线程2才能返 回
线程3在执行ForkJoinTask3
结果是:线程3⾸先执行完,然后线程2才能执行完,最后线程1再执行完,所有的任务其实组成一个有向无环 图DAG,如果线程3调用了forkJoinTask1.join(),那么会形成环,造成死锁,相当于在线程3中操作无限循环(在代码中可以选择保存一个join的调用者,在一定的情况下,突然执行就行了,当然,无论是操作返回值的类型还是默认返回null,都会阻塞,只不过没有操作返回值的没有设置返回值而已,相当于他们需要通知,即这个阻塞是操作他们的,即RecursiveTask和RecursiveAction的)
那么,这种层次依赖、层次通知的 DAG,在 ForkJoinTask 内部是如何实现的呢?站在ForkJoinTask的角度来 看,每个ForkJoinTask,都可能有多个线程在等待它完成,有1个线程在执行它,所以每个ForkJoinTask就是一个 同步对象,线程在调用join()的时候,阻塞在这个同步对象上面,执行完成之后,再通过这个同步对象通知所有等 待的线程(虽然一般只是直接的一个线程,但是总体来说是后面的所有),实际上在main方法中操作多线程,只是多出一个线程池而已,即不会影响的
利用synchronized关键字和Java原⽣的wait()/notify()机制,实现了线程的等待-唤醒机制,调用join()的这些线 程,内部其实是调用ForkJoinTask这个对象的wait(),执行该任务的Worker线程,在任务执行完毕之后,顺便调用notifyAll()
那么可以知道,如果在其他的线程中也操作了join(多个线程中存在join),那么一起唤醒,虽然在案例中没有给出测试(可以在对应的方法里面,手动开一个线程,执行join就知道了)
ForkJoinTask的状态解析:
要实现fork()/join()的这种线程间的同步,对应的ForkJoinTask一定是有各种状态的,这个状态变量是实现fork/join的基础
public abstract class ForkJoinTask < V > implements Future < V > , Serializable {
volatile int status;
private static final int DONE = 1 << 31 ;
private static final int ABNORMAL = 1 << 18 ;
private static final int THROWN = 1 << 17 ;
private static final int SIGNAL = 1 << 16 ;
private static final int SMASK = 0xffff ;
}
初始时,status=0,共有五种状态,可以分为两大类:
1:未完成:status>=0
2:已完成:status<0
所以,通过判断是status>=0,还是status<0,就可知道任务是否完成,进而决定调用join()的线程是否需要 被阻塞
join的详细实现:
public final V join ( ) {
int s;
if ( ( ( s = doJoin ( ) ) & ABNORMAL ) != 0 )
reportException ( s) ;
return getRawResult ( ) ;
}
getRawResult()是ForkJoinTask中的一个模板方法,分别被RecursiveAction和RecursiveTask实现,前者没有 返回值,所以返回null,后者返回一个类型为V的result变量,看对应的RecursiveAction或者RecursiveTask里面的方法就知道了,虽然对应的RecursiveAction泛型默认是Void类型,而由于RecursiveAction有默认返回,所以不用设置,他们都操作阻塞,所以在对应的方法中,可以看到join方法的内容基本是一样的,只是在内部对应的RecursiveAction可能并没有进行赋值,而RecursiveTask可能操作了赋值(可能按照类型来的,这里了解即可,因为没有具体说明)
阻塞主要发⽣在上面的doJoin()方法里面,在dojoin()里调用t.join()的线程会阻塞
然后等待任务t执行完成, 再唤醒该阻塞线程,doJoin()返回,使得最终返回结果(会看看是否正常结束)
注意:当 doJoin()返回的时候,就是该任务执行完成的时候,doJoin()的返回值就是任务的完成状态,也就是 上面的⼏种状态
private int doJoin ( ) {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool. WorkQueue w;
return ( s = status) < 0 ? s :
( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) ?
( w = ( wt = ( ForkJoinWorkerThread ) t) . workQueue) .
tryUnpush ( this ) && ( s = doExec ( ) ) < 0 ? s :
wt. pool. awaitJoin ( w, this , 0L ) :
externalAwaitDone ( ) ;
}
上面的返回值可读性比较差,变形之后:
if ( ( s = status) < 0 ) {
return s;
} else {
if ( ( t = Thread . currentThread ( ) ) instanceof ForkJoinWorkerThread ) {
if ( w = ( wt = ( ForkJoinWorkerThread ) t) . workQueue) . tryUnpush ( this ) && ( s = doExec ( ) ) < 0 ) {
return s;
} else {
wt. pool. awaitJoin ( w, this , 0L )
}
} else {
externalAwaitDone ( ) ;
}
}
先看一下externalAwaitDone(),即外部线程的阻塞过程,相对简单
private int externalAwaitDone ( ) {
int s = tryExternalHelp ( ) ;
if ( s >= 0 && ( s = ( int ) STATUS . getAndBitwiseOr ( this , SIGNAL ) ) >= 0 ) {
boolean interrupted = false ;
synchronized ( this ) {
for ( ; ; ) {
if ( ( s = status) >= 0 ) {
try {
wait ( 0L ) ;
} catch ( InterruptedException ie) {
interrupted = true ;
}
}
else {
notifyAll ( ) ;
break ;
}
}
}
if ( interrupted)
Thread . currentThread ( ) . interrupt ( ) ;
}
return s;
}
内部Worker线程的阻塞,即上面的wt.pool.awaitJoin(w, this, 0L),相比外部线程的阻塞要做更多工作(因为对应的线程类型与外部的是不一样的,自然需要考虑该类型的一些相关的问题,就如窃取作用,也只能是窃取他们之间的队列,而不是全局队列,全局队列由他们自身来窃取的,即使得全局不窃取局部,只是他们之间的关系,并且局部有很多,所以比较复杂),它现 不在ForkJoinTask里面,而是在ForkJoinWorkerThread里面
final int awaitJoin ( WorkQueue w, ForkJoinTask < ? > task, long deadline) {
int s = 0 ;
int seed = ThreadLocalRandom . nextSecondarySeed ( ) ;
if ( w != null && task != null &&
( ! ( task instanceof CountedCompleter ) ||
( s = w. helpCC ( ( CountedCompleter < ? > ) task, 0 , false ) ) >= 0 ) ) {
w. tryRemoveAndExec ( task) ;
int src = w. source, id = w. id;
int r = ( seed >>> 16 ) | 1 , step = ( seed & ~ 1 ) | 2 ;
s = task. status;
while ( s >= 0 ) {
WorkQueue [ ] ws;
int n = ( ws = workQueues) == null ? 0 : ws. length, m = n - 1 ;
while ( n > 0 ) {
WorkQueue q; int b;
if ( ( q = ws[ r & m] ) != null && q. source == id &&
q. top != ( b = q. base) ) {
ForkJoinTask < ? > [ ] a; int cap, k;
int qid = q. id;
if ( ( a = q. array) != null && ( cap = a. length) > 0 ) {
ForkJoinTask < ? > t = ( ForkJoinTask < ? > )
QA . getAcquire ( a, k = ( cap - 1 ) & b) ;
if ( q. source == id && q. base == b++ &&
t != null && QA . compareAndSet ( a, k, t, null ) ) {
q. base = b;
w. source = qid;
t. doExec ( ) ;
w. source = src;
}
}
break ;
}
else {
r += step;
-- n;
}
}
if ( ( s = task. status) < 0 )
break ;
else if ( n == 0 ) {
long ms, ns; int block;
if ( deadline == 0L )
ms = 0L ;
else if ( ( ns = deadline - System . nanoTime ( ) ) <= 0L )
break ;
else if ( ( ms = TimeUnit . NANOSECONDS . toMillis ( ns) ) <= 0L )
ms = 1L ;
if ( ( block = tryCompensate ( w) ) != 0 ) {
task. internalWait ( ms) ;
CTL . getAndAdd ( this , ( block > 0 ) ? RC_UNIT : 0L ) ;
}
s = task. status;
}
}
}
return s;
}
上面的方法有个关键点:里面的循环通常是死循环,并且只有一个返回点,即只有在task.status<0,任务完成之后才 可能返回,否则一般会不断自旋,若自旋之后还不行,就会调用task.internalWait(ms);阻塞
task.internalWait(ms);的代码如下:
final void internalWait ( long timeout) {
if ( ( int ) STATUS . getAndBitwiseOr ( this , SIGNAL ) >= 0 ) {
synchronized ( this ) {
if ( status >= 0 )
try { wait ( timeout) ; } catch ( InterruptedException ie) { }
else
notifyAll ( ) ;
}
}
}
join的唤醒:
调用t.join()之后,线程会被阻塞,接下来看另外一个线程在任务t执行完毕后如何唤醒阻塞的线程
final int doExec ( ) {
int s; boolean completed;
if ( ( s = status) >= 0 ) {
try {
completed = exec ( ) ;
} catch ( Throwable rex) {
completed = false ;
s = setExceptionalCompletion ( rex) ;
}
if ( completed)
s = setDone ( ) ;
}
return s;
}
private int setDone ( ) {
int s;
if ( ( ( s = ( int ) STATUS . getAndBitwiseOr ( this , DONE ) ) & SIGNAL ) != 0 )
synchronized ( this ) { notifyAll ( ) ; }
return s | DONE ;
}
任务的执行发⽣在doExec()方法里面,任务执行完成后,调用一个setDone()通知所有等待的线程,这里也做 了两件事:
1:把status置为完成状态
2:一般来说如果s != 0(一般s,即status是正整数,当然他也包括0),即 s = SIGNAL,说明有线程正在等待这个任务执行完成,调用Java原⽣的notifyAll()通知所有 线程,如果s = 0,说明没有线程等待这个任务,不需要通知
ForkJoinPool的优雅关闭:
同ThreadPoolExecutor一样,ForkJoinPool的关闭也不可能是"瞬时的",也是需要一个平滑的过渡过程
工作线程的退出:
对于一个Worker线程来说(线程池基本都有这样),它会在一个for循环里面不断轮询队列中的任务,如果有任务,则执行,处在活跃 状态,如果没有任务,则进入空闲等待状态
这个线程如何退出呢?
final void runWorker ( WorkQueue w) {
int r = ( w. id ^ ThreadLocalRandom . nextSecondarySeed ( ) ) | FIFO ;
w. array = new ForkJoinTask < ? > [ INITIAL_QUEUE_CAPACITY ] ;
for ( ; ; ) {
int phase;
if ( scan ( w, r) ) {
r ^= r << 13 ; r ^= r >>> 17 ; r ^= r << 5 ;
}
else if ( ( phase = w. phase) >= 0 ) {
long np = ( w. phase = ( phase + SS_SEQ ) | UNSIGNALLED ) & SP_MASK ;
long c, nc;
do {
w. stackPred = ( int ) ( c = ctl) ;
nc = ( ( c - RC_UNIT ) & UC_MASK ) | np;
} while ( ! CTL . weakCompareAndSet ( this , c, nc) ) ;
}
else {
int pred = w. stackPred;
Thread . interrupted ( ) ;
w. source = DORMANT ;
long c = ctl;
int md = mode, rc = ( md & SMASK ) + ( int ) ( c >> RC_SHIFT ) ;
if ( md < 0 )
break ;
else if ( rc <= 0 && ( md & SHUTDOWN ) != 0 &&
tryTerminate ( false , false ) )
break ;
else if ( rc <= 0 && pred != 0 && phase == ( int ) c) {
long nc = ( UC_MASK & ( c - TC_UNIT ) ) | ( SP_MASK & pred) ;
long d = keepAlive + System . currentTimeMillis ( ) ;
LockSupport . parkUntil ( this , d) ;
if ( ctl == c &&
d - System . currentTimeMillis ( ) <= TIMEOUT_SLOP &&
CTL . compareAndSet ( this , c, nc) ) {
w. phase = QUIET ;
break ;
}
}
else if ( w. phase < 0 )
LockSupport . park ( this ) ;
w. source = 0 ;
}
}
}
一般来说(int) (c = ctl) < 0,即最终导致的原因会是低32位的最高位为1导致的,具体为什么并没有具体说明(这里给出前面的一个说明:ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭),说明线程池已经进入了关闭状态,但线程池进入关闭状态,不代表 所有的线程都会立⻢关闭
对应的ForkJoinPool类的shutdown()与shutdownNow()的区别:
public void shutdown ( ) {
checkPermission ( ) ;
tryTerminate ( false , true ) ;
}
public List < Runnable > shutdownNow ( ) {
checkPermission ( ) ;
tryTerminate ( true , true ) ;
return Collections . emptyList ( ) ;
}
二者的代码基本相同,都是调用tryTerminate(boolean, boolean)方法,其中一个传入的是false,另一个传入 的是true,tryTerminate意为试图关闭ForkJoinPool,但并不保证一定可以关闭成功(所以有循环,实际上就算是循环可能也关闭不了,但是一般不会,否则是方法的问题,那么自然也不会直接的给我们使用的)
private boolean tryTerminate ( boolean now, boolean enable) {
int md;
while ( ( ( md = mode) & SHUTDOWN ) == 0 ) {
if ( ! enable || this == common)
return false ;
else
MODE . compareAndSet ( this , md, md | SHUTDOWN ) ;
}
while ( ( ( md = mode) & STOP ) == 0 ) {
if ( ! now) {
for ( long oldSum = 0L ; ; ) {
boolean running = false ;
long checkSum = ctl;
WorkQueue [ ] ws = workQueues;
if ( ( md & SMASK ) + ( int ) ( checkSum >> RC_SHIFT ) > 0 )
running = true ;
else if ( ws != null ) {
WorkQueue w;
for ( int i = 0 ; i < ws. length; ++ i) {
if ( ( w = ws[ i] ) != null ) {
int s = w. source, p = w. phase;
int d = w. id, b = w. base;
if ( b != w. top ||
( ( d & 1 ) == 1 && ( s >= 0 || p >= 0 ) ) ) {
running = true ;
break ;
}
checkSum += ( ( ( long ) s << 48 ) + ( ( long ) p << 32 ) +
( ( long ) b << 16 ) + ( long ) d) ;
}
}
}
if ( ( ( md = mode) & STOP ) != 0 )
break ;
else if ( running)
return false ;
else if ( workQueues == ws && oldSum == ( oldSum = checkSum) )
break ;
}
}
if ( ( md & STOP ) == 0 )
MODE . compareAndSet ( this , md, md | STOP ) ;
}
while ( ( ( md = mode) & TERMINATED ) == 0 ) {
for ( long oldSum = 0L ; ; ) {
WorkQueue [ ] ws; WorkQueue w;
long checkSum = ctl;
if ( ( ws = workQueues) != null ) {
for ( int i = 0 ; i < ws. length; ++ i) {
if ( ( w = ws[ i] ) != null ) {
ForkJoinWorkerThread wt = w. owner;
w. cancelAll ( ) ;
if ( wt != null ) {
try {
wt. interrupt ( ) ;
} catch ( Throwable ignore) {
}
}
checkSum += ( ( long ) w. phase << 32 ) + w. base;
}
}
}
if ( ( ( md = mode) & TERMINATED ) != 0 ||
( workQueues == ws && oldSum == ( oldSum = checkSum) ) )
break ;
}
if ( ( md & TERMINATED ) != 0 )
break ;
else if ( ( md & SMASK ) + ( short ) ( ctl >>> TC_SHIFT ) > 0 )
break ;
else if ( MODE . compareAndSet ( this , md, md | TERMINATED ) ) {
synchronized ( this ) {
notifyAll ( ) ;
}
break ;
}
}
return true ;
}
线程在执行时,对应的方法是给出一份的,也就是说,方法可以多人进入,而对于变量,但是如果是局部的,而不是堆(使用同一个对象)或者方法区(静态)的,那么一般是不共享的
总结:shutdown()只拒绝新提交的任务(对应的if (!now) { 是true,因为 tryTerminate(false, true);是shutdown方法的),shutdownNow()会取消现有的全局队列和局部队列中的任务,同时 唤醒所有空闲的线程,让这些线程自动退出(对应的if (!now) { 是false,因为tryTerminate(true, true);是shutdownNow方法的)
至此线程池的一些知识大致说明完毕
多线程设计模式(这里最好了解 ,大多数的JUC里面的类或者接口,一般多是类,都使用到了多线程设计模式,比如后面我们手写的读写锁就满足了对应的模式,并在相应的后面,也会给出JUC里面的类似使用该模式的类例子,如ReentrantReadWriteLock类):
Single Threaded Execution(英文意思:单线程执行)模式:
所谓Single Threaded Execution模式,指的是"以一个线程执行",该模式用于设置限制,以确保同一时间只能 让一个线程执行处理
Single Threaded Execution有时也称为临界区(critical section)或临界域(critical region),Single Threaded Execution名称侧重于执行处理的线程(一个线程),而别称临界区或临界域侧重于执行范围,即执行安全,简单来说就是在某个地方进行单线程执行,如后面加锁的方法
示例程序:
package main8 ;
public class NumberResource {
private Integer number = 0 ;
private Integer printIndex = 0 ;
private void add ( ) {
this . number++ ;
}
private Integer get ( ) {
return this . number;
}
public void showNumber ( ) throws InterruptedException {
if ( printIndex > 100 ) System . exit ( 0 ) ;
Integer number1 = this . get ( ) ;
this . add ( ) ;
Thread . sleep ( 5 ) ;
Integer number2 = this . get ( ) ;
if ( ( number1 + 1 ) == number2) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " => 递增--正确- -:" + number1 + " ***** " + number2) ;
} else {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " => 递增**异常 **:" + number1 + " ***** " + number2) ;
}
printIndex++ ;
}
}
package main8 ;
public class UserThread extends Thread {
private NumberResource resource;
public UserThread ( NumberResource resource) {
this . resource = resource;
}
@Override
public void run ( ) {
while ( true ) {
try {
resource. showNumber ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
}
package main8 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
NumberResource resource = new NumberResource ( ) ;
new UserThread ( resource) . start ( ) ;
new UserThread ( resource) . start ( ) ;
}
}
上述代码会出现递增异常,之所以递增异常,是因为showNumber方法是一个临界区,其中对数字加一,但又不能保证原子 性(你加我也加,那么自然递增异常),在多线程执行的时候,就会出现问题
线程安全的NumberResource类:
public synchronized void showNumber ( ) throws InterruptedException {
Single Threaded Execution模式总结:
SharedResource(共享资源):
Single Threaded Execution模式中出现了一个发挥SharedResource(共享资源)作用的类,在示例程序中, 由NumberResource类扮演SharedResource角色,如操作里面的对应的number变量和printIndex变量,虽然他一般代表操作锁的共享资源
SharedResource角色是可以被多个线程访问的类,包含很多方法,但这些方法主要分为如下两类:
safeMethod(英文意思:安全方法):多个线程同时调用也不会发⽣问题的方法(如上面加上了synchronized的方法)
unsafeMethod(英文意思:不安全方法):多个线程同时访问会发⽣问题,因此必须加以保护的方法(如上面没有加上synchronized的方法)
safeMethod,无需考虑分歧或者什么问题
而对于unsafeMethod,在被多个线程同时执行时,实例状态有可能发⽣分歧,这时就需要保护该方法,使其不 被多个线程同时访问
Single Threaded Execution模式会保护unsafeMethod,使其同时只能由一个线程访问,一般java通常是通过unsafeMethod声明为synchronized方法来进行保护(使得变成安全方法),上面也是这样做的
我们将只允许单个线程执行的程序范围称为临界区
Single Threaded Execution类图:
上面的指向只是一个名称,代表使得可以是单个线程的操作
何时使用Single Threaded Execution模式:
1:多线程时
在单线程程序中使用synchronized关键字并不会破坏程序的安全性,但是调用synchronized方法要比调用一 般方法花费时间,稍微降低程序性能
2:多个线程访问时(可以认为"多线程时"包括"多个线程访问")
当SharedResource角色的实例有可能被多个线程同时访问时,就需要使用Single Threaded Execution模式
即便是多线程程序,如果所有线程都是完全独立操作的(安全或者分开),也无需使用Single Threaded Execution模式,这种 状态称为线程互不⼲涉
在某些处理多个线程的框架中,有时线程的独立性是由框架控制的(具体可以自己百度,比如线程安全的集合等待,安全,或者线程池,分开),此时,框架的使用者就无需考虑是否使用Single Threaded Execution模式
3:状态有可能变化时
之所以需要使用Single Threaded Execution模式,是因为SharedResource角色的状态会发⽣变化(整个状态,我们也可以认为是对他方法操作安全,防止其他人改变,如对应的变量)
如果在创建实例后,实例的状态再也不发⽣变化,就无需使用Single Threaded Execution模式
4:需要确保安全性时
只有在需要确保安全性时(需要安全性),才需要使用Single Threaded Execution模式
Java的集合类大多数都是⾮线程安全的,这是为了在不需要考虑安全性的时候提高程序运行速度
用户在使用类时,需要考虑自己要用的类是否是线程安全的,来决定是否使用该模式
死锁:
使用Single Threaded Execution模式时,存在发⽣死锁的危险
死锁是指两个线程分别持有锁,并相互等待对方释放锁的现象,发⽣死锁的线程都无法再继续运行,程序卡 死
两个人吃饭,都需要⼑和叉,但⼑叉又只有一套,某时刻,其中一个人拿了⼑,另一个拿了叉,而且两人都在 等待对方让出自己需要的叉或⼑,这种情形下,两个人都只能一直等待下去,这就是发⽣了死锁
在Single Threaded Execution模式中,满⾜下列条件时,会发⽣死锁:
存在多个SharedResource角色(可以是两个类,类里面可以包含多个变量,这里代表锁的共享资源)
线程在持有某个SharedResource角色锁的同时,还想获取其他SharedResource角色的锁
获取SharedResource角色的锁的顺序不固定(SharedResource角色是对称的,我中有你,你中有我)
临界区的大小和性能:
一般情况下,Single Threaded Execution模式会降低程序性能:
1:获取锁花费时间
进入synchronized方法时,线程需要获取对象的锁,该处理会花费时间
如果SharedResource角色的数量减少了,那么要获取的锁的数量也会相应地减少,从而就能够抑制性能的下 降了
2:线程冲突引起的等待
当线程执行临界区内的处理时,其他想要进入临界区的线程会阻塞,这种状况称为线程冲突,发⽣冲突时,程 序的整体性能会随着线程等待时间的增加而下降
总结:简单来说,该模式就相当于一个用一个共享资源(共享的,一般我们也利用他里面的变量,一般即他是共享,也执行他的方法,虽然这里与线程类分开了)来操作锁的模式,所以不是单纯的操作加锁,更简单点:就是对一个可以是多个线程同时进入的地方加上锁的模式
Immutable(英文意思:不变的)模式:
Immutable就是不变的、不发⽣改变,Immutable模式中存在着确保实例状态不发⽣改变的类,在访问这些实 例时不需要执行耗时的互斥处理,如果能用好该模式,就可以提高程序性能
如String就是一个不可变类,immutable的,即对应的值是存在的,只是我们没有指向他了而已,可以看这个博客:https://blog.csdn.net/zhi_sun/article/details/119169738,第20章博客也说明了,一般代表常量,所以不可变(单纯的,不是创建对象,虽然创建对象也算,因为他是一个该模式的类,而单纯的赋值实际上相当于是他进行特殊处理了而已,使得放的位置不同,总体来说源头还是String类)
示例程序:
package main9 ;
public class User {
private final Integer userId;
private final String username;
private final String desc;
public User ( Integer userId, String username, String desc) {
this . userId = userId;
this . username = username;
this . desc = desc;
}
public Integer getUserId ( ) {
return userId;
}
public String getUsername ( ) {
return username;
}
public String getDesc ( ) {
return desc;
}
@Override
public String toString ( ) {
return "User{" +
"userId=" + userId +
", username='" + username + '\'' +
", desc='" + desc + '\'' +
'}' ;
}
}
package main9 ;
public class UserThread extends Thread {
private Integer index = 0 ;
private User user;
public UserThread ( User user) {
this . user = user;
}
@Override
public void run ( ) {
while ( true ) {
if ( index >= 100 ) {
System . exit ( 0 ) ;
}
System . out. println ( Thread . currentThread ( ) . getName ( ) + " ===> " + user) ;
index++ ;
}
}
}
package main9 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) throws InterruptedException {
User user = new User ( 1001 , "张三" , "张三是一个好人" ) ;
new UserThread ( user) . start ( ) ;
new UserThread ( user) . start ( ) ;
new UserThread ( user) . start ( ) ;
new UserThread ( user) . start ( ) ;
new UserThread ( user) . start ( ) ;
}
}
很明显,由于操作的是自己对象的变量,并不是共享的,那么对于他来说没有发生改变,因为没有其他人改变他,所以这里对于之前的在Single Threaded Execution模式中,将修改或引用实例状态的地方设置为临界区,该区只能由一个线程执 行,而对于本案例的User类,实例的状态绝对不会发⽣改变,甚至没有操作改变(就算你去操作,由于关键字的原因,也不能改变),所以即使多个线程同时对该实例执行处理,实例也不会出 错,因为实例的状态不变,如此也无需使用synchronized关键字来保护实例,比如下图中的例子,规定不改变
Immutable模式中的角色:
Immutable:
Immutable角色是一个类,该角色中的字段值不可修改,也不存在修改字段内容的方法,无需对Immutable角 色应用Single Threaded Execution模式,无需使用synchronized关键字,就是本案例的User类
何时使用Immutable模式:
1:必须是实例创建后,状态不再发⽣变化的,实例的状态由字段的值决定,即使字段是final的且不存在setter, 但是也有可能不是不可变的,因为字段引用的实例有可能发⽣变化,这样与变量的final是无关的,我们只是改变起指向的对象而已,但没有改变该变量,但间接的改变了该变量了
2:实例是共享的,且被频繁访问时
Immutable模式的优点是不需要使用synchronized关键字进行保护,意味着在不失去安全性和⽣存性的前提下 提高性能,当实例被多个线程共享,且有可能被频繁访问时,Immutable模式优点明显
注意:
StringBuffer类表示字符串的可变类(比如可以进行append追加),String类表示字符串的不可变类,String实例表示的字符串不可以修改, 执行操作的方法都不是synchronized修饰的,引用速度更快
如果需要频繁修改字符串内容,则使用StringBuffer,如果不需要修改字符串内容,只是引用内容,则使用String
JDK中的不可变模式类:
java.lang.String,java.math.BigInteger,java.math.Decimal,java.util.regex.Pattern,java.lang.Boolean,java.lang.Byte,java.lang.Character,java.lang.Double,java.lang.Float,java.lang.Integer,java.lang.Long,java.lang.Short,java.lang.Void
可能有其他的类,以及上面可能会随着时间的推移(不同JDk版本)发生改变,但一般是对(不变)的,即主要看JDK版本
总结:就是不操作改变,只给值的模式,所以无论你在多个线程访问时,是否加锁都没有关系,这也验证,线程在得到值时,基本是操作副本的,一般class文件会分开成堆和方法区的(了解即可),我们操作的都是副本(或者说从class文件的副本),所以也验证对应的class只有一个,而我们创建对象时就是拿副本,虽然静态的也是一个,会特殊处理,与class一样,所以class也是可以得到的
Guarded Suspension模式:
Guarded表示被守护、被保卫、被保护,Suspension表示暂停,如果执行现在的处理会造成问题,就让执行 处理的线程进行等待,这就是Guarded Suspension模式
Guarded Suspension模式通过让线程等待来保证实例的安全型,Guarded Suspension也称为guarded wait、spin lock等名称
示例程序:
package main10 ;
public class Request {
private final String name;
public Request ( String name) {
this . name = name;
}
@Override
public String toString ( ) {
return "Request{" +
"name='" + name + '\'' +
'}' ;
}
}
package main10 ;
import java. util. LinkedList ;
import java. util. Queue ;
public class RequestQueue {
private final Queue < Request > queue = new LinkedList < > ( ) ;
public synchronized Request getRequest ( ) {
while ( queue. peek ( ) == null ) {
try {
wait ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
return queue. remove ( ) ;
}
public synchronized void putRequest ( Request request) {
queue. offer ( request) ;
notifyAll ( ) ;
}
}
package main10 ;
import java. util. Random ;
public class ClientThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ClientThread ( RequestQueue requestQueue, String name, long seed) {
super ( name) ;
this . requestQueue = requestQueue;
this . random = new Random ( seed) ;
}
@Override
public void run ( ) {
for ( int i = 0 ; i < 10000 ; i++ ) {
Request request = new Request ( "请求:" + i) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 请求 " + request) ;
requestQueue. putRequest ( request) ;
try {
Thread . sleep ( random. nextInt ( 1000 ) ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
}
package main10 ;
import java. util. Random ;
public class ServerThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ServerThread ( RequestQueue requestQueue, String name, long seed) {
super ( name) ;
this . requestQueue = requestQueue;
random = new Random ( seed) ;
}
@Override
public void run ( ) {
for ( int i = 0 ; i < 10000 ; i++ ) {
Request request = requestQueue. getRequest ( ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 处理 " + request) ;
try {
Thread . sleep ( random. nextInt ( 1000 ) ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
}
package main10 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
RequestQueue requestQueue = new RequestQueue ( ) ;
new ClientThread ( requestQueue, "client-1" , 432432L ) . start ( ) ;
new ServerThread ( requestQueue, "server-1" , 9988766L ) . start ( ) ;
}
}
应用保护条件进行保护:
public synchronized Request getRequest ( ) {
while ( queue. peek ( ) == null ) {
try {
wait ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
return queue. remove ( ) ;
}
上面代码中,getRequest方法执行的逻辑是从queue中取出一个Request实例,即 queue.remove() ,但是要获取Request实例,必须满⾜条件: queue.peek() != null ,该条件就是Guarded Suspension模式的守护条件 (guard condition:英文意思:防护条件,所以这里称为守护条件),防止null的时候进行删除,使得出现异常
当线程执行到while语句时:
若守护条件成立,线程不进入while语句块,直接执行queue.remove()方法,线程不会等待
若守护条件不成立,线程进入while语句块,执行wait,开始等待
public synchronized void putRequest ( Request request) {
queue. offer ( request) ;
notifyAll ( ) ;
}
若守护条件不成立,则线程等待,等待什么?等待notifyAll()唤醒该线程
守护条件阻⽌了线程继续向前执行,除⾮实例状态发⽣改变,守护条件成立,被另一个线程唤醒
该类中的synchronized关键字保护的是queue字段,getRequest方法的synchronized保护该方法只能由一个 线程执行(虽然也是阻塞唤醒的条件,因为阻塞唤醒就是针对操作一人的,而不是sleep一样的,即各有所长)
线程执行this.wait之后,进入this的等待队列,并释放持有的this锁
notify、notifyAll或interrupt会让线程退出等待队列,实际继续执行之前还必须再次获取this的锁线程才可以 继续执行
相关的细节说明在前面已经说过了,这里回顾一下而已
时序图(有时候给出,是因为比较复杂的时候给出,或者是基础的给出,其中如果是相对应的后面有类似的可能只给出类图(是可能,并不是一定,所以要注意)):
这个图从上到下的看就可以了(ok代表成功操作,即成功放入或者成功得到)
Guarded Suspension模式中的角色:
GuardedObject(被保护的对象):
GuardedObject角色是一个持有被保护(guardedMethod)的方法的类,当线程执行guardedMethod方法 时,若守护条件成立,立即执行,当守护条件不成立,则等待,守护条件是可以随着GuardedObject角色的状态不同而变的(因为是使用对应的变量的)
除了guardedMethod之外,GuardedObject角色也可以持有其他改变实例状态(stateChangingMethod,带这个括号的,一般代表该别名,即这里是实例状态)的 方法,那么实际上Single Threaded Execution模式也可以,只需要共享资源的类来实现改变其他状态即可,但是对应的线程只是基本操作他而已,所以并没有意义
java中,guardedMethod通过while语句和wait方法来实现,stateChangingMethod通过notify/notifyAll方法 实现(不阻塞,使得进行出队)
在本案例中,RequestQueue为GuardedObject,getRequest方法为guardedMethod(也可以是stateChangingMethod,因为是"除了guardedMethod之外",上面的话术说明),putRequest为stateChangingMethod
可以将Guarded Suspension理解为多线程版本的if,满足条件进入,否则不进入
我们也可以使用LinkedBlockingQueue替代对应的队列(自带的阻塞队列,那么我们就不用自己实现阻塞和唤醒了,前面说明过他了)
package main10 ;
import java. util. concurrent. LinkedBlockingQueue ;
public class RequestQueue {
private final LinkedBlockingQueue < Request > queue = new LinkedBlockingQueue < > ( ) ;
public Request getRequest ( ) {
Request request = null ;
try {
request = queue. take ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
return request;
}
public void putRequest ( Request request) {
try {
queue. put ( request) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
类图:
虽然之前用的是不安全的,但是由于我们操作了锁,且只有一人(一个线程,对应的一个消费者或者生产者,是或者,他们并没有一起执行,因为是同一个this,即同一个引用,若他们一起可能就有问题了,因为是线程不安全的,那么在指向上可能会覆盖,使得没有删除或者没有添加对应的同一个位置,这是多线程的一个大众问题),所以基本不会出现问题
总结:就是在多线程中进行判断,使得不会出现问题或者避免出现问题的模式
Balking模式:
所谓Balk,就是停⽌并返回的意思
Balking模式与Guarded Suspension模式一样,也存在守护条件,在Balking模式中,如果守护条件不成立, 则立即中断处理,而Guarded Suspension模式一直等待直到可以运行
示例程序:
这里简单说明一下流程(因为这里可能会复杂点):
两个线程,一个是修改线程,修改之后,等待随机时长,保存文件内容
另一个是保存线程,固定时长进行文件内容的保存(一开始不等,之后就按照固定时长了,所以打印时第一个基本都会出现"2不需要保存"的内容),如果文件需要保存,则执行保存动作,如果文件不需要保存,则不执行保存动作
package main11 ;
import java. io. FileWriter ;
import java. io. IOException ;
import java. io. Writer ;
public class Data {
private final String filename;
private String content;
private boolean changed;
public Data ( String filename, String content) {
this . filename = filename;
this . content = content;
}
public synchronized void change ( String newContent) {
this . content = newContent;
this . changed = true ;
}
public synchronized void save ( ) throws IOException {
if ( changed) {
doSave ( ) ;
changed = false ;
} else {
System . out. println ( Thread . currentThread ( ) . getName ( ) + "不需要保存" ) ;
}
}
private void doSave ( ) throws IOException {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 调用doSave,内容为:" + content) ;
Writer writer = new FileWriter ( filename) ;
writer. write ( content) ;
writer. close ( ) ;
}
}
package main11 ;
import java. io. IOException ;
public class SaverThread extends Thread {
private final Data data;
public SaverThread ( String name, Data data) {
super ( name) ;
this . data = data;
}
@Override
public void run ( ) {
while ( true ) {
try {
data. save ( ) ;
Thread . sleep ( 1000 ) ;
} catch ( IOException e) {
e. printStackTrace ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
}
package main11 ;
import java. io. IOException ;
import java. util. Random ;
public class ChangerThread extends Thread {
private final Data data;
private final Random random = new Random ( ) ;
public ChangerThread ( String name, Data data) {
super ( name) ;
this . data = data;
}
@Override
public void run ( ) {
for ( int i = 0 ; true ; i++ ) {
data. change ( "第 " + i + " 次修改" ) ;
try {
Thread . sleep ( random. nextInt ( 2000 ) ) ;
data. save ( ) ;
} catch ( IOException e) {
e. printStackTrace ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
}
package main11 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
Data data = new Data ( "F:/run.txt" , "嘿嘿" ) ;
new ChangerThread ( "1" , data) . start ( ) ;
new SaverThread ( "2" , data) . start ( ) ;
}
}
很明显,中断处理就是没有执行对应的方法,而不是等待执行
Balking模式中的角色:
GuardedObject(受保护对象):
GuardedObject角色是一个拥有被保护的方法(guardedMethod)的类,当线程执行guardedMethod时,若 保护条件成立,则执行实际的处理,若不成立,则不执行实际的处理,直接返回
保护条件的成立与否随着GuardedObject角色状态的改变而变动
除了guardedMethod之外,GuardedObject角色还有可能有其他改变状态的方法 (stateChangingMethod)
在此案例中,Data类对应于GuardedObject,save方法对应guardedMethod(也可以是stateChangingMethod),change方法对应stateChangingMethod方法
保护条件是changed字段为true(这里就提一下了)
类图:
何时使用Balking模式(前面的Guarded Suspension模式好像没有这样的说明,即并不是非常重要的,或者是有这个类似的,才没有说明):
1:不需要执行时
在此示例程序中,content字段的内容如果没有修改,就将save方法balk,之所以要balk,是因为content已经 写文件了,无需再写了(这个意思代表,如果我没有修改的话,说明已经写入一次了,虽然第一次也出现了,但是这也只是第一次而已,之后基本就是这样的),所以,在这种情况下,如果并不需要执行某些代码,就可以使用Balking模式,此时可以提高程序性能,而不是等待执行(阻塞唤醒是需要时间的,虽然这里是不使用的,如果要使用需要考虑很多问题,只需要写入后进行设置即可)
2:不需要等待守护条件成立时
Balking模式的特点就是不等待,若条件成立,就执行,若不成立,就不执行,立即进入下一个操作
3:守护条件仅在第一次成立时
当"守护条件仅在第一次成立或者需要第一次操作"时,可以使用Balking模式
比如各种类的初始化操作,检查一次是否初始化了,如果初始化了,就不用执行了,如果没有初始化,则进行 初始化
balk结果的表示:
1:忽略balk
最简单的方式就是不通知调用端"发⽣了balk",示例程序采用的就是这种方式,即没有操作返回值
2:通过返回值表示balk
可以通过boolean值表示balk,若返回true,表示未发⽣balk,需要执行并执行了处理,若false,则表示发⽣了balk,处理已执行,不再需要执行,有时也会使用null来表示"发⽣了balk",不只是false一种哦,具体可以自己设置,比如0,1也行,这要看自己
3:通过异常表示balk
有时也通过异常表示"发⽣了balk",即,当balk时,程序并不从方法return,而是抛异常
Balking和Guarded Suspension模式之间:
介于"直接balk并返回"和"等待到守护条件成立为⽌"这两种极端之间的还有一种"在守护条件成立之前等待一段 时间",在守护条件成立之前等待一段时间,如果到时条件还未成立,则直接balk
这种操作称为计时守护(guarded timed)或超时(timeout)
java.util.concurrent中的超时:
1:通过异常通知超时
一般当发⽣超时抛出异常时,且不适合使用返回值表示超时,通常需要使用java.util.concurrent.TimeoutException异 常,当然也可以自己操作异常,但是一般是这个(现在JDK自带的一般操作他,因为总要有个共享的异常吧,为什么不是他呢)
如:
java.util.concurrent.Future的get方法
java.util.concurrent.Exchanger的exchange方法
java.util.concurrent.Cyclicarrier的await方法
java.util.concurrent.CountDownLatch的await方法
具体可能不会这样,但是现在一般是正确的,可能还有其他,主要看JDK版本吧
2:通过返回值通知超时
当执行多次try时,则不使用异常,而使用返回值表示超时
如:
java.util.concurrent.BlockingQueue接口,当offer方法的返回值为false,或poll方法的返回值为null,表示发 ⽣了超时
java.util.concurrent.Semaphore类,当tryAcquire方法的返回值为false时,表示发⽣了超时
java.util.concurrent.locks.Lock接口,当tryLock方法的返回值为false时,表示发⽣了超时
也是看JDK版本
总结:该模式就是Guarded Suspension模式不进行等待的模式,虽然还有他和Guarded Suspension模式之间的特殊操作,但也属于Guarded Suspension模式的说明
Producer-Consumer模式:
⽣产者安全地将数据交给消费者,当⽣产者和消费者以不同的线程运行时,两者之间的处理速度差异会有问题,⽣产者消费者模式用于消除线程间处理速度的差异带来的问题,在该模式中,⽣产者和消费者都有多个,当⽣产者和消费者只有一个时,我们称为管道(Pipe)模式,就类似与前面的Guarded Suspension模式(或者就是他),各自只有一个(但是他们确只能一人进入,但是就算是多个,也是一样的只能一人进入,除非是操作边界通知,"边界通知"在前面说明过了)
示例程序:
package main12 ;
public class Table {
private final String [ ] buffer;
private int tail;
private int head;
private int count;
public Table ( int count) {
this . buffer = new String [ count] ;
this . head = 0 ;
this . tail = 0 ;
this . count = 0 ;
}
public synchronized void put ( String steamedBread) throws InterruptedException {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 蒸出来 " + steamedBread) ;
while ( count >= buffer. length) {
wait ( ) ;
}
buffer[ tail] = steamedBread;
tail = ( tail + 1 ) % buffer. length;
count++ ;
notifyAll ( ) ;
}
public synchronized String take ( ) throws InterruptedException {
while ( count <= 0 ) {
wait ( ) ;
}
String steamedBreak = buffer[ head] ;
head = ( head + 1 ) % buffer. length;
count-- ;
notifyAll ( ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 取⾛ " + steamedBreak) ;
return steamedBreak;
}
}
package main12 ;
import java. util. Random ;
public class CookerThread extends Thread {
private final Random random;
private final Table table;
private static int id = 0 ;
public CookerThread ( String name, Table table, long seed) {
super ( name) ;
this . table = table;
this . random = new Random ( seed) ;
}
@Override
public void run ( ) {
while ( true ) {
try {
Thread . sleep ( random. nextInt ( 1000 ) ) ;
String steamedBread = "[ Steamed bread No. " + nextId ( ) + " by " + getName ( ) + "]" ;
table. put ( steamedBread) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
private static synchronized int nextId ( ) {
return id++ ;
}
}
package main12 ;
import java. util. Random ;
public class EaterThread extends Thread {
private final Random random;
private final Table table;
public EaterThread ( String name, Table table, long seed) {
super ( name) ;
this . table = table;
this . random = new Random ( seed) ;
}
@Override
public void run ( ) {
try {
while ( true ) {
String steamedBread = table. take ( ) ;
Thread . sleep ( random. nextInt ( 1000 ) ) ;
}
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
package main12 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
Table table = new Table ( 3 ) ;
new CookerThread ( "厨师-1" , table, 12345L ) . start ( ) ;
new CookerThread ( "厨师-2" , table, 23456L ) . start ( ) ;
new CookerThread ( "厨师-3" , table, 34567L ) . start ( ) ;
new EaterThread ( "饭桶-1" , table, 45678L ) . start ( ) ;
new EaterThread ( "饭桶-2" , table, 56789L ) . start ( ) ;
new EaterThread ( "饭桶-3" , table, 67890L ) . start ( ) ;
}
}
之前有个类似的即多个消费者的地方的代码(前面有过说明多个消费者或者多个生产者的地方,比如:边界通知,比如"我们给出了疑问:“这里在后面会进行说明”",当然这些地方都进行了说明,一般还有其他地方,这里就不多说了),简单来说就是操作多个生成(产)者和多个消费者的模式
关于put方法:
put方法会抛出InterruptedException异常(中断的异常),如果抛出,可以理解为"该操作已取消"
put方法使用了Guarded Suspension模式
tail和count的更新采取buffer环的形式(1,2,3,1,2,3这样的顺序,前面也有过类似的案例,比如MyQueue类)
notifyAll方法唤醒正在等待馒头(这里将生产的东西简称为馒头了)的线程来吃
关于take方法:
take方法会抛出InterruptedException异常,表示该操作已取消
take方法采用了Guarded Suspension模式
head和count的更新采用了buffer环的形式
notifyAll唤醒等待的厨子线程开始蒸馒头
时序图:
Producer-Consumer模式中的角色:
1:Data(英文意思:数据)
Data角色由Producer角色⽣成,供Consumer角色使用,在本案例中,String类的馒头对应于Data角色
2:Producer
Producer角色⽣成Data角色,并将其传递给Channel角色,本案例中,CookerThread对应于Producer角色
3:Consumer
Consumer角色从Channel角色获取Data角色并使用,本案例中,EaterThread对应于Consumer角色
4:Channel角色
Channel角色管理从Producer角色获取的Data角色,还负责响应Consumer角色的请求,传递Data角色,为 了安全,Channel角色会对Producer角色和Consumer角色进行互斥处理
当producer角色将Data角色传递给Channel角色时,如果Channel角色状态不能接收Data角色,则Producer角色将一直等待,直到Channel可以接收Data角色为⽌,当Consumer角色从Channel角色获取Data角色时,如果Channel角色状态没有可以传递的Data角色,则Consumer角色将一直等待,直到Channel角色状态转变为可以传递Data角色为⽌,当存在多个Producer角色和Consumer角色时,Channel角色需要对它们做互斥处理,很明显对应的Table就是该角色(包含Date角色)
类图:
守护安全性的Channel角色(可复用):
在⽣产者消费者模型中,承担安全守护责任的是Channel角色,Channel角色执行线程间的互斥处理,确保Producer角色正确地将Data角色传递给Consumer角色
不要直接传递:
Consumer角色想要获取Data角色,通常是因为想使用这些Data角色来执行某些处理,如果Producer角色直 接调用Consumer的方法,执行处理的就不是Consumer的线程,而是Producer角色的线程了,这样一来,异步处 理变同步处理(虽然也是,因为加锁,但是若是边界通知那么是这样,虽然这里不是,即这里需要考虑边界通知),自然会发⽣不同Data间的延迟,降低程序的性能
传递Data角色的顺序可以有如下:
1:队列——先⽣产先消费(上面的代码本质上应该是这个,因为是一路过去的,自然是先生产先消费)
2:栈——先⽣产后消费
3:优先队列——"优先"的先消费(某些比较)
Channel意义:
线程的协调要考虑"放在中间的东⻄"(对应的位置移动以及条件)
线程的互斥要考虑"应该保护的东⻄"(对应的锁)
为了让线程协调运行,必须执行互斥处理,以防⽌共享的内容被破坏,线程的互斥处理时为了线程的协调运行 而执行的
JUC包和Producer-Consumer模式:
JUC中提供了BlockingQueue接口及其实现类,相当于Producer-Consumer模式中的Channel角色
BlockingQueue接口——阻塞队列
ArrayBlockingQueue——基于数组的BlockingQueue
LinkedBlockingQueue——基于链表的BlockingQueue
PriorityBlockingQueue——带有优先级的BlockingQueue
DelayQueue——一定时间之后才可以take的BlockingQueue
SynchronousQueue——直接传递的BlockingQueue
ConcurrentLinkedQueue——元素个数没有最大限制的线程安全队列
上面的队列在前面也大致说明过了
总结:就是完成操作多个生产者和多个消费者的模式
Read-Write Lock模式:
当线程读取实例的状态时,实例的状态不会发⽣变化,实例的状态仅在线程执行写入操作时才会发⽣变化
从实例状态变化来看,读取和写入有本质的区别
在本模式中,读取操作和写入操作分开考虑,在执行读取操作之前,线程必须获取用于读取的锁,在执行写入 操作之前,线程必须获取用于写入的锁
可以多个线程同时读取,读取时不可写入,当线程正在写入时,其他线程不可以读取或写入(注意:包含读取哦,因为写入是改变的,是特殊的,相当于读写锁)
一般来说,执行互斥会降低程序性能,如果把写入的互斥和读取的互斥分开考虑,则可以提高性能,相当于不用考虑对方或者大概率使用没有互斥的一方,自然相对应的对方代码就少(之所以有(就少,说明有联系),是因为可能需要判断是否是写还是读)或者没有,以及可能互斥的少(因为很多时候可能是使用没有互斥的地方,如读读),自然会提高效率
现在我们来完成这个模式,或者说,手写一个读写锁
示例程序:
package main13 ;
public class Data {
private final char [ ] buffer;
private final ReadWriteLock lock = new ReadWriteLock ( ) ;
public Data ( int size) {
this . buffer = new char [ size] ;
for ( int i = 0 ; i < buffer. length; i++ ) {
buffer[ i] = '*' ;
}
}
public char [ ] read ( ) throws InterruptedException {
lock. readLock ( ) ;
try {
return doRead ( ) ;
} finally {
lock. readUnlock ( ) ;
}
}
public void write ( char c) throws InterruptedException {
lock. writeLock ( ) ;
try {
doWrite ( c) ;
} finally {
lock. writeUnlock ( ) ;
}
}
private char [ ] doRead ( ) {
char [ ] newbuf = new char [ buffer. length] ;
for ( int i = 0 ; i < buffer. length; i++ ) {
newbuf[ i] = buffer[ i] ;
}
slowly ( ) ;
return newbuf;
}
private void doWrite ( char c) {
for ( int i = 0 ; i < buffer. length; i++ ) {
buffer[ i] = c;
slowly ( ) ;
}
}
private void slowly ( ) {
try {
Thread . sleep ( 50 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
package main13 ;
public class ReaderThread extends Thread {
private final Data data;
public ReaderThread ( Data data) {
this . data = data;
}
@Override
public void run ( ) {
try {
while ( true ) {
char [ ] readbuf = data. read ( ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 读取了 " + String . valueOf ( readbuf) ) ;
}
} catch ( InterruptedException e) {
}
}
}
package main13 ;
import java. util. Random ;
public class WriterThread extends Thread {
private static final Random RANDOM = new Random ( ) ;
private final Data data;
private final String filler;
private int index = 0 ;
public WriterThread ( Data data, String filler) {
this . data = data;
this . filler = filler;
}
@Override
public void run ( ) {
try {
while ( true ) {
char c = nextchar ( ) ;
data. write ( c) ;
Thread . sleep ( RANDOM . nextInt ( 3000 ) ) ;
}
} catch ( InterruptedException e) {
}
}
private char nextchar ( ) {
char c = filler. charAt ( index) ;
index++ ;
if ( index >= filler. length ( ) ) {
index = 0 ;
}
return c;
}
}
package main13 ;
public class ReadWriteLock {
private int readingReaders = 0 ;
private int waitingWriters = 0 ;
private int writingWriters = 0 ;
private boolean preferWriter = true ;
public synchronized void readLock ( ) throws InterruptedException {
while ( writingWriters > 0 || ( preferWriter && waitingWriters > 0 ) ) {
wait ( ) ;
}
readingReaders++ ;
}
public synchronized void readUnlock ( ) {
readingReaders-- ;
preferWriter = true ;
notifyAll ( ) ;
}
public synchronized void writeLock ( ) throws InterruptedException {
waitingWriters++ ;
try {
while ( readingReaders > 0 || writingWriters > 0 ) {
wait ( ) ;
}
} finally {
waitingWriters-- ;
}
writingWriters++ ;
}
public synchronized void writeUnlock ( ) {
writingWriters-- ;
preferWriter = false ;
notifyAll ( ) ;
}
}
package main13 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
Data data = new Data ( 10 ) ;
new ReaderThread ( data) . start ( ) ;
new ReaderThread ( data) . start ( ) ;
new ReaderThread ( data) . start ( ) ;
new ReaderThread ( data) . start ( ) ;
new ReaderThread ( data) . start ( ) ;
new ReaderThread ( data) . start ( ) ;
new WriterThread ( data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ" ) . start ( ) ;
new WriterThread ( data, "abcdefghijklmnopqrstuvwxyz" ) . start ( ) ;
}
}
大致流程就是读操作时,写不能操作,但是读可以,而写操作读和写都不可以,最主要的就是阻塞和唤醒之间的细节了,其中保证了阻塞中,对应下一个比然会到唤醒,使得不会都进行阻塞,具体需要自己理解
守护条件:
readLock方法和writeLock方法都是用了Guarded Suspension模式,Guarded Suspension模式的重点是守护 条件
readLock方法:
读取线程⾸先调用readLock方法,当线程从该方法返回,就可以执行实际的读取操作,当线程开始执行实际的读取操作时,只需要判断是否存在正在写入的线程,以及是否存在正在等待的写入线 程, 不考虑读取线程,如果存在正在写入的线程或者存在正在等待的写线程,则等待
writeLock方法:
在线程开始写入之前,调用writeLock方法,当线程从该方法返回后,就可以执行实际的写入操作,开始执行写入的条件:如果有线程正在执行读取操作,出现读写冲突,或者如果有线程正在执行写入的操作, 引起写冲突,当前线程等待
Read-Write Lock模式中的角色:
Reader:
该角色对共享资源角色执行读取操作,在案例中就是ReaderThread类
Writer:
该角色对共享资源角色执行写操作,在案例中就是WriterThread类
SharedResource:
共享资源角色表示Reader角色和Writer角色共享的资源,共享资源角色提供不修改内部状态的操作(读取)和 修改内部状态的操作(写),当前案例中对应于Data类
ReadWriteLock:
写锁角色提供了共享资源角色实现读操作和写操作时需要的锁,即当前案例中的readLock和readUnlock, 以及writeLock和writeUnlock,对应于当前案例中ReadWriteLock类
只是一个图,这里就不给出说明(类图)了
要点:
1:利用读取操作的线程之间不会冲突的特性来提高程序性能
Read-Write Lock模式利用了读操作的线程之间不会冲突的特性,由于读取操作不会修改共享资源的状 态,所以彼此之间无需加锁,因此,多个Reader角色同时执行读取操作,从而提高程序性能
2:适合读取操作负载较大的情况
如果单纯使用Single Threaded Execution模式,则read也只能运行一个线程,如果read负载很重,可以 使用Read-Write Lock模式
3:适合少写多读
Read-Write Lock模式优点是Reader之间不会冲突,如果写入很频繁,Writer会频繁停⽌Reader的处 理,也就无法体现出Read-Write Lock模式的优势了,所以这个时候也就与互斥差不多了
锁的含义:
synchronized可以用于获取实例的锁,java中同一个对象锁不能由两个以上的线程同时获取
用于读取的锁和用于写入的锁与使用synchronized获取的锁是不一样的,开发人员可以通过修改ReadWriteLock类来改变锁的运行(就如他是我们写的一样,自然可以改变)
ReadWriteLock类提供了用于读取的锁和用于写入的锁两个逻辑锁,但是实现这两个逻辑锁的物理锁只有一 个,就是ReadWriteLock实例持有的锁(也就是this,操作synchronized,因为他们之间要互通,自然需要同一个锁,即操作wait和notify(包括notifyAll)的操作)
JUC包和Read-Write Lock模式:
java.util.concurrent.locks包提供了已实现Read-Write Lock模式的ReadWriteLock接口和ReentrantReadWriteLock类
java.util.concurrent.locks.ReadWriteLock接口的功能和当前案例中的ReadWriteLock类类似,不同之处在于 该接口用于读取的锁和用于写入的锁是通过其他对象来实现的
java.util.concurrent.locks.ReentrantReadWriteLock类实现了ReadWriteLock接口,其特征如下:
1:公平性
当创建ReentrantReadWriteLock类(前面好像也大致说明过)的实例时,可以选择锁的获取顺序是否要设置为fair的,如果创建的 实例是公平的,那么等待时间久的线程将可以优先获取锁(够久了终于到我了吧)
2:可重入性
ReentrantReadWriteLock类的锁是可重入的,Reader角色的线程可以获取用于写入的锁,Writer角色的 线程可以获取用于读取的锁(只是重入,并不是一定操作具体作用,但是可能会保留某些东西,使得可以锁降级,这里在前面可能没有说明,这里补充一下)
3:锁降级
ReentrantReadWriteLock类可以按如下顺序将用于写入的锁降级为用于读取的锁:
用于读取的锁不能升级为用于写入的锁,即对应的写重入时,可能不会保留某些东西
4:快捷方法
ReentrantReadWriteLock类提供了获取等待中的线程个数的方法 getQueueLength ,以及检查是否获取 了用于写入锁的方法 isWriteLocked 等方法,他总该有其他的方法吧,而不是我们的这里的关键方法,就如在99章博客(如果有上篇和下篇和中篇,一般这样的说明代表是上下篇,可能也包括中篇,只是给出起点篇而已,所以可能存在100章博客中,所以若有上篇和下篇或者包括中篇,那么99就代表99和100,或者也有101,虽然99并没有,而中篇的在这里的博客,即101,102,103)中一个手写的栈就可以操作反向打印(查询)(自己写的其他方法哦,一般代表除了关键方法外,如添加,删除等等,我们一般将他们这样的方法称为补充方法,也称快捷方法,即其他(作用)的方法)
总结:相当于读写锁的逻辑,这里没有什么总结的,只要看懂代码流程即可
Thread-Per-Message模式:
该模式可以理解为"每个消息一个线程",消息这里可以理解为命令或请求,每个命令或请求分配一个线程,由 这个线程来处理,这就是Thread-Per-Message模式,在Thread-Per-Message模式中,所以自然消息的委托方和执行方是不同的线程,因为准备执行时,由其他人执行,类似于前面的ForkJoinPool类的分开操作,一个线程分开多个,只是他并没有什么等待操作,且没有线程保存
示例程序:
在此示例程序中,ConcurrentDemo类委托Host来显示字符,Host类会创建一个线程,来处理委托,启动的 线程使用Helper类来执行实际的显示
package main14 ;
public class Host {
private final Helper helper = new Helper ( ) ;
public void request ( final int count, final char c) {
System . out. println ( "\t请求:【" + count + "," + c + "】开始。。。" ) ;
new Thread ( ) {
@Override
public void run ( ) {
helper. handle ( count, c) ;
}
} . start ( ) ;
System . out. println ( "\t请求:【" + count + "," + c + "】结束!!!" ) ;
}
}
package main14 ;
public class Helper {
public void handle ( int count, char c) {
System . out. println ( "\t\t处理:【" + count + "," + c + "】开始。。。" ) ;
for ( int i = 0 ; i < count; i++ ) {
slowly ( ) ;
System . out. print ( c) ;
}
System . out. println ( "" ) ;
System . out. println ( "\t\t处理:【" + count + "," + c + "】结束!!!" ) ;
}
private void slowly ( ) {
try {
Thread . sleep ( 100 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
package main14 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
System . out. println ( "主线程 -- 开始执行" ) ;
Host host = new Host ( ) ;
host. request ( 10 , 'A' ) ;
host. request ( 20 , 'B' ) ;
host. request ( 30 , 'C' ) ;
System . out. println ( "主线程 -- 执行结束" ) ;
}
}
Thread-Per-Message模式中的角色:
Client(委托方):
Client角色向Host角色发起请求,而不用关⼼Host角色如何实现该请求处理,当前案例中对应于ConcurrentDemo类,更简单(具体点)的就是main对应的线程,即主线程
Host:
Host角色收到Client角色请求后,创建并启用一个线程,新建的线程使用Helper角色来处理请求,当前案例中对应于Host类
Helper:
Helper角色为Host角色提供请求处理的功能,Host角色创建的新线程调用Helper角色,当前案例中对应于Helper类
要点:
1:提高响应性,缩短延迟时间
Thread-Per-Message模式能够提高与Client角色对应的Host角色的响应性,降低延迟时间,尤其是当handle操作⾮常耗时或者handle操作需要等待输入/输出时,效果很明显,即进行了分开操作,其中我们也可以为了缩短线程启动花费的时间,可以使用Worker Thread模式(后面会说明)
2:适用于操作顺序没有要求时
在Thread-Per-Message模式中,handle方法并不一定按照request方法的调用顺序来执行(就如多线程是抢占cpu来决定谁先执行的,所以多次的执行顺序不一样,所以需要操作没有顺序要求的,否则可能需要某些延迟操作,比较麻烦一点)
3:适用于不需要返回值时
在Thread-Per-Message模式中,request方法并不会等待handle方法的执行结束,即request是通常得不到handle的结果(因为是新的线程),当需要获取操作结果时,可以使用Future模式(后面会说明)
4:应用于服务器(具体如何应用,可以选择百度)
JUC包和Thread-Per-Message模式:
java.lang.Thread类:最基本的创建、启动线程的类
java.lang.Runnable接口:线程锁执行的任务接口
java.util.concurrent.ThreadFactory接口:将线程创建抽象化的接口
java.util.concurrent.Executors:用于创建实例的工具类
java.util.concurrent.Executor接口:将线程执行抽象化的接口
java.util.concurrent.ExecutorService接口:将被复用的线程抽象化的接口
java.util.concurrent.ScheduledExecutorService类:将被调度线程的执行抽象化的接口
总结:看成一个线程里面在创建线程即可,或者就是一个低配的线程池(只创建线程,线程池在当前主线程里操作,自然是一个线程里面创建线程)
Worker Thread模式:
在Worker Thread模式中,工人线程(worker thread)会逐个取回工作并进行处理,当所有工作全部完成 后,工人线程会等待新的工作到来,简单的说就是线程池操作,所以可以在前面这样说"其中我们也可以为了缩短线程启动花费的时间,可以使用Worker Thread模式(后面会说明)",即他是重复利用的(等待继续执行,执行后,继续等待(可以认为是自旋或者阻塞中))
Worker Thread模式也被称为Background Thread模式,有时也称为Thread Pool模式
示例程序:
ClientThread类的线程会向Channel类发送工作请求(委托)
Channel类的实例有五个工人线程进行工作,所有工人线程都在等待工作请求的到来
当收到工作请求后,工人线程会从Channel获取一项工作请求并开始工作,工作完成后,工人线程回到Channel那里等待下一项工作请求
package main15 ;
public class Channel {
private static final int MAX_REQUEST = 100 ;
private final Request [ ] requestQueue;
private int tail;
private int head;
private int count;
private final WorkerThread [ ] threadPool;
public Channel ( int threads) {
this . requestQueue = new Request [ MAX_REQUEST ] ;
this . head = 0 ;
this . tail = 0 ;
this . count = 0 ;
threadPool = new WorkerThread [ threads] ;
for ( int i = 0 ; i < threadPool. length; i++ ) {
threadPool[ i] = new WorkerThread ( "Worker-" + i, this ) ;
}
}
public void startWorkers ( ) {
for ( int i = 0 ; i < threadPool. length; i++ ) {
threadPool[ i] . start ( ) ;
}
}
public synchronized void putRequest ( Request request) {
while ( count >= requestQueue. length) {
try {
wait ( ) ;
} catch ( InterruptedException e) {
}
}
requestQueue[ tail] = request;
tail = ( tail + 1 ) % requestQueue. length;
count++ ;
notifyAll ( ) ;
}
public synchronized Request takeRequest ( ) {
while ( count <= 0 ) {
try {
wait ( ) ;
} catch ( InterruptedException e) {
}
}
Request request = requestQueue[ head] ;
head = ( head + 1 ) % requestQueue. length;
count-- ;
notifyAll ( ) ;
return request;
}
}
package main15 ;
import java. util. Random ;
public class ClientThread extends Thread {
private final Channel channel;
private static final Random RANDOM = new Random ( ) ;
public ClientThread ( String name, Channel channel) {
super ( name) ;
this . channel = channel;
}
@Override
public void run ( ) {
try {
for ( int i = 0 ; true ; i++ ) {
Request request = new Request ( getName ( ) , i) ;
channel. putRequest ( request) ;
Thread . sleep ( RANDOM . nextInt ( 1000 ) ) ;
}
} catch ( InterruptedException e) {
}
}
}
package main15 ;
public class ConcurrentDemo {
public static void main ( String [ ] args) {
Channel channel = new Channel ( 5 ) ;
channel. startWorkers ( ) ;
new ClientThread ( "张三" , channel) . start ( ) ;
new ClientThread ( "李四" , channel) . start ( ) ;
new ClientThread ( "王五" , channel) . start ( ) ;
}
}
package main15 ;
import java. util. Random ;
public class Request {
private final String name;
private final int number;
private static final Random RANDOM = new Random ( ) ;
public Request ( String name, int number) {
this . name = name;
this . number = number;
}
public void execute ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " 执行 " + this ) ;
try {
Thread . sleep ( RANDOM . nextInt ( 1000 ) ) ;
} catch ( InterruptedException e) {
}
}
@Override
public String toString ( ) {
return "Request{" +
"name='" + name + '\'' +
", number=" + number +
'}' ;
}
}
package main15 ;
public class WorkerThread extends Thread {
private final Channel channel;
public WorkerThread ( String name, Channel channel) {
super ( name) ;
this . channel = channel;
}
@Override
public void run ( ) {
while ( true ) {
Request reqeust = channel. takeRequest ( ) ;
reqeust. execute ( ) ;
}
}
}
相当于多个线程在阻塞,只要你有一个,那么他们继续进行操作,然后继续阻塞,虽然这里将对应的拿取操作称为任务,使得相当于之前的Producer-Consumer模式了,但是主要的区别是,其中的生产是我们来进行操作的(虽然这里是操作无限循环,也就是无限的给任务,这就是主要区别),抛出与他的区别,也验证了他的线程是一直存在的,也就不用再次的创建线程来执行了(因为无限循环,自然使得他还是起作用,而不是销毁,使得不用创建线程了,即重复利用,即相当于线程池),实际上也验证了线程池的原理也算是一个典型的⽣产者—消费者模型,只是将对应的操作的同一个数据(如数组)改成任务(也可以是数组)了,所以一般的,我们将该模式也可以称为Producer-Consumer模式,只不过他倾向于一个固定的数量(如线程数量并不会很多),即将对方(用户)的操作者,变成自己(自己的线程,而不是用户的线程)的转变了,即是一个特殊的Producer-Consumer模式,用户(我们的人创建的线程,或者就是人,只是人消费时会是线程,但一般也是操作线程池的)变成内部(内部自己保存的线程)了,所以实际上Producer-Consumer模式与Worker Thread模式原理基本一样,本质只是量级的大小,Worker Thread模式比较小点(针对消费来说的,因为是完成,所以是针对消费),而Producer-Consumer模式一般根据用户来说的(如服务器,由于是Thread-Per-Message模式的补充(所以他Worker Thread模式也可以是该模式的补充),所以本质上才会说Thread-Per-Message模式的要点中有:“应用于服务器(具体如何应用,可以选择百度)”)
Worker Thread模式中的角色:
Client(委托者):
Client角色创建Request角色并将其传递给Channel角色,在本例中,ClientThread对应Client角色
Channel:
Channel角色接收来自Client角色的Request角色,并将其传递给Worker角色,在本例中,Channel类对 应Channel角色
Worker:
Worker角色从Channel角色中获取Request角色,并执行其逻辑,当一项工作结束后,继续从Channel获 取另外的Request角色,本例中,WorkerThread类对应Worker角色
Request:
Request角色表示工作,Request角色中保存了工作的逻辑,本例中,Request类对应Request角色
Worker Thread模式的优点:
1:提高吞吐量
如果将工作交给其他线程,当前线程就可以处理下一项工作,称为Thread Per Message模式,由于启动新线程消耗时间,可以通过Worker Thread模式轮流和反复地使用线程来提高吞吐量
2:容量控制
Worker角色的数量在本例中可以传递参数指定
Worker角色越多,可以并发处理的逻辑越多,同时增加Worker角色会增加消耗的资源(可以认为多个变量,自然会占用资源喽),必须根据程序实 际运行环境调整Worker角色的数量
3:调用与执行的分离
Worker Thread模式和Thread Per Message模式一样,方法的调用和执行是分开的(方法的调用是invocation,方法的执行是execution,只是代表英文名称),因为我调用你,但是却是开个线程执行的,自然分开,且他也操作了等待,至此才算认为是一个线程池,而不是前面的"类似于前面的ForkJoinPool类的分开操作"
这样,可以:
提高响应速度
控制执行顺序(虽然是自动的,而不是手动操作顺序,即不需要一定的顺序,也就是"适用于操作顺序没有要求时",即这个控制是没有要求的意思,这里要注意哦),因为执行不受调用顺序的制约
可以取消和反复执行
进行分布式部署,通过网络将Request角色发送到其他Woker计算节点进行处理
Runnable接口的意义:
java.lang.Runnable 接口有时用作Worker Thread模式的Request角色,即可以创建Runnable接口的 实现类对象表示业务逻辑,然后传递给Channel角色
Runnable对象可以作为方法参数,可以放到队列中,可以跨网络传输,也可以保存到文件中,如此则Runnable对象不论传输到哪个计算节点,都可以执行
也就是将我们要操作的Request角色的保存内容设置为线程的对象(即Runnable),线程池就是这样的,所以每次提交基本都是对应的Runnable的对象,虽然也可以是Callable
多态的Request角色:
本案例中,ClientThread传递给Channel的只是Request实例,但是WorkerThread并不知道Request类 的详细信息
所以即使我们传递的是Request的子类给Channel,WorkerThread也可以正常执行execute方法,那么若子类进行改变,即通过Request的多态,可以增加任务的种类(或者考虑其他的工作,如给某个变量加1),而无需修改Channel角色和Worker角色
JUC包和Worker Thread模式:
1:ThreadPoolExecutor类
java.util.concurrent.ThreadPoolExecutor 类是管理Worker线程的类,可以轻松实现Worker Thread模式
2:通过 java.util.concurrent 包创建线程池
java.util.concurrent.Executors 类就是创建线程池的工具类
总结:看成线程池即可
Future模式:
Future的意思是未来,假设由一个方法需要长时间执行才能获取结果,则一般不会让调用的程序等待,而是先 返回给它一张"提货卡",获取提货卡并不消耗很多时间,该"提货卡"就是Future角色
获取Future角色的线程稍后使用Future角色来获取运行结果,也就是说,你可以等下获取,很明显现在说的就是类似于前面的ForkJoinPool类了
示例程序:
package main16 ;
public interface Data {
String getContent ( ) ;
}
package main16 ;
public class FutureData implements Data {
private RealData realData = null ;
private boolean ready = false ;
public synchronized void setRealData ( RealData realData) {
if ( ready) {
return ;
}
this . realData = realData;
this . ready = true ;
notifyAll ( ) ;
}
@Override
public synchronized String getContent ( ) {
while ( ! ready) {
try {
wait ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
return realData. getContent ( ) ;
}
}
package main16 ;
public class Host {
public Data request ( final int count, final char c) {
System . out. println ( "\trequest(" + count + ", " + c + ") 开始" ) ;
final FutureData future = new FutureData ( ) ;
new Thread ( ) {
@Override
public void run ( ) {
RealData realData = new RealData ( count, c) ;
future. setRealData ( realData) ;
}
} . start ( ) ;
System . out. println ( "\trequest(" + count + ", " + c + ") 结束" ) ;
return future;
}
}
package main16 ;
public class Main {
public static void main ( String [ ] args) {
Host host = new Host ( ) ;
Data data1 = host. request ( 10 , 'A' ) ;
Data data2 = host. request ( 20 , 'B' ) ;
Data data3 = host. request ( 30 , 'C' ) ;
System . out. println ( "等待一会⼉再获取结果" ) ;
try {
Thread . sleep ( 2000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "data1 = " + data1. getContent ( ) ) ;
System . out. println ( "data2 = " + data2. getContent ( ) ) ;
System . out. println ( "data3 = " + data3. getContent ( ) ) ;
}
}
package main16 ;
public class RealData implements Data {
private final String content;
public RealData ( int count, char c) {
System . out. println ( "\t组装RealData(" + count + ", " + c + ") 开始" ) ;
char [ ] buffer = new char [ count] ;
for ( int i = 0 ; i < count; i++ ) {
buffer[ i] = c;
try {
Thread . sleep ( 100 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
System . out. println ( "\t\t组装RealData(" + count + ", " + c + ") 结束" ) ;
this . content = new String ( buffer) ;
}
@Override
public String getContent ( ) {
return content;
}
}
通过上面的观察,的确实现了等待他的结果,其中由于锁的对象是不同的,所以对应的notifyAll();只操作对应的一个,即的确是等待结果,也就与ForkJoinPool类似(他由于是二分开的(前面是二分),一般是难测试出来,实际上是这样,你可以自己测试)
流程图:
Future模式中的角色:
Client(请求者):
Client角色向Host角色发出请求,并立即接收到请求的处理结果(也就是Future角 色)
Client角色不必知道返回值是RealData还是Future角色,只需要稍后通过VirtualData角色来操作返回即可,本案例中,对应Main类
Host:
Host角色创建新的线程,由新线程创建RealData角色,同时,Host角色将Future角色(当做VirtualData角色)返回给Client角色,本案例中对应Host类
VirtualData(虚拟数据):
VirtualData角色是让Future角色与RealData角色具有一致性的角色,本案例中对应Data接口,所以对应可以通过Data直接得到
RealData(真实数据):
RealData角色是表示真实数据的角色,创建该对象需要花费很多时间,本案例中对应RealData类
Future:
Future角色是RealData角色的"提货单"(可以通过他得到返回数据),由Host角色传递给Client角色,对Client而言,Future角色就是VirtualData角色,当Client角色操作Future角色时线程会wait,直到RealData角色创建完成
Future角色将Client角色的操作委托给RealData角色, 本案例中,对应于FutureData类
很明显,这里只是单纯的操作返回,而之前的ForkJoinPool他在操作二分中,还需要操作返回,所以可能整体类的关系不同,但是对应的返回流程是类似的,而正是因为他需要考虑太多,所以看起来不一致也是正常的
要点:
1:使用Thread Per Message模式,可以提高程序响应性,但是不能获取结果,Future模式也可以提高程序 响应性,还可以获取处理结果,而ForkJoinPool相当于Worker Thread模式(补充,自然包含分开了,线程池自身就算分开的,只不过只是一次而已,但是既然是模式,自然可以继续操作,所以包含分开)和Future模式的结合,但是他主要考虑ForkJoinPool
2:利用Future模式异步处理特性,可以提高程序吞吐量,虽然并没有减少业务处理的时长(需要继续结合分开,不是单纯的线程创建了,而是创建线程进行二分任务,属于分开的一种),但是如果考虑 到I/O,当程序进行磁盘操作时,CPU不只是处于等待状态,即会有空闲时间处理其他的任务(这里中间可以进行操作,稍后获取就行,而不用等待你完成),并且他本身也是异步,所以自然会提高效率
3:"准备返回值"和"使用返回值"的分离(上面2中的空闲时间)
4:如果想等待处理完成后获取返回值,还可以考虑采用回调处理方式,即,当处理完成后(最后),由Host角色启 动的线程调用Client角色的方法(可以自己定义),进行结果的处理,此时Client角色中的方法需要线程安全地传递返回 值
JUC包与Future模式:
java.util.concurrent包提供了用于⽀持Future模式的类和接口
java.util.concurrent.Callable接口将"返回值的某种处理调用"抽象化了,Callable接口声明了call方法,call方 法类似于Runnable的run方法,但是call方法有返回值,Callable表示Callable接口的call方法返回值类型 为泛型,自然可以是String类型
java.util.concurrent.Future接口相当于本案例中的Future角色,Future接口声明了get方法来获取结果,但是 没有声明设置值的方法,设置值的方法需要在Future接口的实现类中声明,Future表示"Future接口的get方法返回值类型可以是String类型”,除了get方法,Future接口还声明了用于中断运行的cancel方法
java.util.concurrent.FutureTask类是实现了Future接口的标准类,FutureTask类声明了用于获取值的get方 法、用于中断运行的cancel方法、用于设置值的set方法,以及用于设置异常的setException方法,由于FutureTask类实现了Runnable接口,还声明了run方法
Callable、Future、FutureTask的类图:
总结:看成ForkJoinPool(没有二分的)即可
在多线程设计模式中,只需要看懂对应的代码流程即可,具体的说明可以大致过一下
至此多线程设计模式说明完毕