CountDownLatch
日常开发中经常遇到一个线程需要等待一些线程都结束后才能继续向下运行
的场景,在CountDownLatch出现之前通常使用join方法来实现,但join方法不够灵活,所以开发了CountDownLatch。场景:一个等其他多个线程,或者多个等其他多个的场景
CountDownLatch比join方法来实现线程同步更加灵活。
CountDownLatch是使用AQS实现的,使用AQS的状态变量存放计数器的值。
- 初始化CountDownLatch时设置状态值(计数器的值)
- 多个线程调用countdown方法是原子性递减AQS状态值
- 当线程调用await方法后线程会被放入AQS的阻塞队列等待计数器为0再返回,其他线程调用countdown方法让计数器值-1,当计数器值变为0时,当前线程要调用doReleaseShared方法激活由于await()方法而被阻塞的线程。
示例
// 创建一个CountDownLatch实例
CountDownLatch countDownLatch = new CountDownLatch(2);//两个子线程 所以new2
public static void main(String[] args) throws InterruptedException {
// 线程池去操作线程
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程池A添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// 模拟运行时间
Thread.sleep(1000);
System.out.println("thread one over...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 递减计数器
countDownLatch.countDown();
}
}
});
// 将线程池b添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("thread two over...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}
});
System.out.println("wait all child thread over!");
// 阻塞直到被interrupt或计数器递减至0(子线程执行完毕,返回)
countDownLatch.await();
System.out.println("all child thread over!");
executorService.shutdown();
}
输出为:
wait all child thread over!
thread one over...
thread two over...
all child thread over!
CountDownLatch相对于join方法的优点大致有两点:
- 调用一个子线程的join方法后,该线程会一直阻塞直到子线程运行完毕,而CountDownLatch允许子线程运行完毕或在运行过程中递减计数器,也就是说await方法不一定要等到子线程运行结束才返回。
- 使用线程池来管理线程一般都是直接添加Runnable到线程池,这时就没有办法再调用线程的join方法了,而仍可在子线程中递减计数器,也就是说CountDownLatch相比join方法可以更灵活地控制线程的同步。
类图结构
JDK开发组在何时初始化计数器,在何时递减计数器,当计数器变为0时做了什么操作,多个线程如何通过计时器实现同步。
由图可知,CountDownLatch是基于AQS实现的。把计数器的值赋给了AQS状态变量state,CountDownLatch的计数器值就是AQS的state值。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);// 把count赋值给AQS的state
}
源码解析
内部静态类 Sync
- 实际上是就是共享锁,内部静态类 Sync 继承了 AQS,实现了共享锁的 tryAcquireShared、tryReleaseShared
- 但是实现是反过来的
- 在 tryAcquireShared 获取同步状态的时候, state > 0 返回 false,state == 0,返回 true,从而达到如果 count 没减到零,那么调用 await 的线程全部进 CLH 去挂起
- 在 tryReleaseShared 释放同步状态的时候,state > 0 返回 false,state == 0 ,返回 true,从而达到调用 countDown 的线程已经够了,然后就会去 CLH 队列唤醒,因为是共享型的,所以唤醒一个之后,它会继续在 doAquireShared 的 setHeadAndProgate 中继续传播下去唤醒,从而所有 awite 的线程就可以继续执行了。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
// 获取同步状态
int getCount() {
return getState();
}
// 获取同步状态
@Override
protected int tryAcquireShared(int acquires) {
// 如果没有减为 0,那么进 CLH 等待
return (getState() == 0) ? 1 : -1;
}
// 释放同步状态
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// countDown 的线程数够了,那就返回 true,然后 doAquireShared 里 head 后第一个线程就可以执行了。
return nextc == 0;
}
}
}
}
void await()
当线程调用CountDownLatch的await方法后,当前线程会被阻塞,直到CountDownLatch的计数器值递减至0或者其他线程调用了当前线程的interrupt方法。
线程获取资源时可以被中断,获取的资源是共享资源。
await方法委托sync调用了AQS的acquireSharedInterruptibly方法。
acquireSharedInterruptibly方法先判断当前线程是否被中断
- 是抛异常
- 否则调用sync实现的tryAcquireShared方法查看当前状态值(计数器的值)是否为0
- 是则从当前线程的await()方法直接返回
- 否则调用AQS的doAcquireSharedInterruptibly方法让当前线程阻塞。
// CountDownLatch的await()方法
public void await() throws InterruptedException {
// 允许中断(中断时抛出异常)
sync.acquireSharedInterruptibly(1); // await方法委托sync调用了AQS的acquireSharedInterruptibly方法
}
// AQS的acquireSharedInterruptibly方法
// AQS获取共享资源时可被中断的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// state=0时tryAcquireShared方法返回1,直接返回,否则执行doAcquireSharedInterruptibly方法
if (tryAcquireShared(arg) < 0)// tryAcquireShared检查当前状态值(计数器值)是否为0
// state不为0,调用该方法使await方法阻塞
doAcquireSharedInterruptibly(arg);
}
// Sync实现了AQS的tryAcquireShared接口
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// AQS的方法doAcquireSharedInterruptibly
// state不为0,调用该方法使await方法阻塞
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 获取state值,state=0时r=1,直接返回,不再阻塞
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 若state不为0则阻塞调用await方法的线程
// 等到其他线程执行countDown方法使计数器递减至0
// (state变为0)或该线程被interrupt时
// 该线程才能继续向下运行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean await(long timeout, TimeUnit unit)
当线程调用CountDownLatch的await方法后,当前线程会被阻塞,直到CountDownLatch的计数器值递减至0返回true或设置的timeout时间到了,因为超时返回false,或者其他线程调用了当前线程的interrupt方法。
相较于上面的await方法,调用此方法后调用线程最多被阻塞timeout时间(单位由unit指定),即使计数器没有递减至0或调用线程没有被interrupt,调用线程也会继续向下运行。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
void countDown()
线程调用该方法后,计数器的值递减,递减后如果计数器的值为0则唤醒所有因await方法而被阻塞的线程,否则什么都不做。
递减计数器,当计数器的值为0(即state=0)时会唤醒所有因调用await方法而被阻塞的线程。
public void countDown() {
// 将计数器减1 委托sync调用AQS的releaseShared方法
sync.releaseShared(1);
}
// AQS的releaseShared方法
public final boolean releaseShared(int arg) {
// 调用sync实现的tryReleaseShared方法
if (tryReleaseShared(arg)) { // 当state递减=0时返回true执行AQS的doReleaseShared方法唤醒因调用await方法阻塞线程
// AQS的释放资源方法
doReleaseShared();
return true;
}
return false;
}
// Sync重写的AQS中tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
// 循环CAS直到当前线程成功完成CAS使计数器值(状态值state)减1并更新到state
for (;;) {
int c = getState();
// 如果state已经为0,直接返回
if (c == 0)
return false;
int nextc = c-1;
// 通过CAS递减state的值
if (compareAndSetState(c, nextc))
// 如果state被递减至0,返回true 返回true要唤醒因调用await方法而被阻塞的线程调用AQS的doReleaseShared方法来激活阻塞线程
return nextc == 0;
}
}
long getCount()
public long getCount() {
return sync.getCount();
}
// sync的getCount底层是调用getState方法获取state值
int getCount() {
return getState();
}
CyclicBarrier
CountDownLatch的计数器时一次性的,也就是说当CountDownLatch计数器至变为0后,再调用await和countDown方法会直接返回,起不到同步的效果了。而CyclicBarrier可以满足计数器可以重置的需要。
CyclicBarrier是回环屏障的意思,它可以使一组线程全部达到一个状态后再全部同时执行,然后重置自身状态又可用于下一次的状态同步。
回环
是因为当所有等待线程执行完毕,重置CyclicBarrier的状态可以被重用。
屏障
是因为线程调用await方法后就会被阻塞,这个阻塞点教屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务
- 内部是使用可重入独占锁 ReentrantLock(基于AQS) 及其 Condition 。
- 因为只有一个 await 操作,不像 countDownLatch 和 Semaphore 有释放操作,所以没法调用 release,只能用 conditon 来阻塞,不用AQS阻塞队列
示例
假设一个任务由阶段1、阶段2、阶段3组成,每个线程要串行地执行阶段1、阶段2、阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1都执行完毕后才能进入阶段2,当所有线程的阶段2都执行完毕后才能进入阶段3,可用下面的代码实现:1.2.3阶段都串行执行,1全部执行完才能2
public static void main(String[] args) {
// 等待两个线程同步
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 运行两个子线程,当两个子线程的step1都执行完毕后才会执行step2
// 当两个子线程的step2都执行完毕后才会执行step3
for(int i = 0; i < 2; i++) {
// 添加两个线程到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread() + " step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + " step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + " step3");
}catch (Exception e){
e.printStackTrace();
}
}
});
}
// 关闭线程池
executorService.shutdown();
}
输出如下:
Thread[pool-1-thread-1,5,main] step1
Thread[pool-1-thread-2,5,main] step1
Thread[pool-1-thread-1,5,main] step2
Thread[pool-1-thread-2,5,main] step2
Thread[pool-1-thread-2,5,main] step3
Thread[pool-1-thread-1,5,main] step3
每个子线程在执行完1阶段后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行(保证了所有线程都完成阶段1后才会开始执行阶段2)。如何在阶段2后面调用await方法(同上)。这个功能使用单个CountDownLatch无法完成的。
类图结构
CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务
- 内部是使用可重入独占锁 ReentrantLock(基于AQS) 及其 Condition 。
- 因为只有一个 await 操作,不像 countDownLatch 和 Semaphore 有释放操作,所以没法调用 release,只能用 conditon 来阻塞,不用 CLH 队列
属性
/**
* Generation 是 CyclicBarrier 内部静态类,描述了 CyclicBarrier 的更新换代。在CyclicBarrier中,同一批线程属于同一代 。
* 当有 `parties` 个线程全部到达 barrier 时,`generation` 就会被更新换代。其中 `broken` 属性,标识该当前 CyclicBarrier 是否已经处于中断状态。
*/
// 记录当前屏障是否被打破
private static class Generation {
// 默认 barrier 是没有损坏的。
boolean broken = false;
}
// 可重入独占锁
private final ReentrantLock lock = new ReentrantLock();
// 使用 condition 阻塞
private final Condition trip = lock.newCondition();
// 线程数
private final int parties;
// 所有线程到齐要执行的任务(由最后一个到的线程执行)
private final Runnable barrierCommand;
// 分代
private Generation generation = new Generation();
// 剩余线程数
private int count;
CyclicBarrier基于ReentrantLock独占锁实现,本质上还是基于AQS的。
-
parties用于记录线程个数(需要到达屏障点的线程数),表示多少个线程调用conditon 的 await(阻塞等待)方法后,所有线程才会冲破屏障往下运行。
-
count一开始等于parties(需要到达屏障点线程数),当由线程调用await方法时会递减1,当count变成0时到达屏障点,所有调用await的线程会一起往下执行,此时要重置CyclicBarrier,再次令count=parties。【count最大数 – 重置成最大数】
-
lock用于保证更新计数器count的原子性。lock的条件变量trip用于支持线程间使用await和signalAll进行通信。
构造函数
以下是CyclicBarrier的构造函数:
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
public CyclicBarrier(int parties) {
this(parties, null);
}
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作 barrierAction ,该操作由最后一个进入 barrier 的线程执行。public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;// barrierCommand是任务,执行时机是所有线程都到达屏障点后。
}
lock用于保证更新计数器count的原子性。lock的条件变量trip用于支持线程间使用await和signalAll进行通信。
源码分析
int await()
当前线程调用该方法时会阻塞,直到满足以下条件之一才会返回:
- parties个线程调用了await方法,也就是到达屏障点
- 其他线程调用了当前线程的interrupt方法
- Generation对象的broken标志被设置为true,抛出BrokenBarrierExecption
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//不超时等待
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取锁,因为要使用 lock 的 condition 来阻塞线程
final ReentrantLock lock = this.lock;
// 获取锁成功才可以使用 condition
lock.lock();
try {
// 分代,同时用来检查是否被其他线程将 broken 设为 true 了
final Generation g = generation;
// 当前generation“已损坏”,抛出BrokenBarrierException异常
// 抛出该异常一般都是某个线程在等待某个处于“断开”状态的 CyclicBarrie
if (g.broken)
//当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
throw new BrokenBarrierException();
//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 进来一个线程 count - 1,当 count 变为 0 的时候,会由那个线程执行 condition.signal
int index = --count;
//count == 0 表示所有线程均已到位,触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//触发任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
if (!ranAction) // 未执行,说明 barrierCommand 执行报错,或者线程打断等等情况。
breakBarrier();
}
}
// 走到这就说明 count 没有减为 0,那就要用 condition 来阻塞
for (;;) {
try {
// 如果不是超时等待,则调用 Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
// 超时等待,调用 Condition.awaitNanos()方法等待,并获得等待的时间
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 到这说明所有线程都到期了,这个线程进入 CLH 队列后已经成功被唤醒了
// 虽然 ReentrantLock 是排他锁,但是已经剩下没几步,其他还在 CLH 队列的也等不了多久
if (g.broken)
// 如果被 broken 了,那抛异常
throw new BrokenBarrierException();
// generation已经更新,返回index
if (g != generation)
return index;
// “超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}
// 某个线程检查到被中断会调用这个方法,设置屏障已经破坏
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
// 当所有线程都已经到达 barrier 处(index == 0),则最后一个到达的线程会执行下面的方法
private void nextGeneration() {
//唤醒所有线程。
trip.signalAll();
//重置 count 。
count = parties;
//重置 generation 。
generation = new Generation();
}
boolean await(long timeout, TimeUnit unit)
相比于await(),等待超时会返回false。当线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:
- parties个线程调用了await方法,也就是到达屏障点
- 设置的超时时机到了返回false
- 其他线程调用了当前线程的interrupt方法
- Generation对象的broken标志被设置为true,抛出BrokenBarrierExecption
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
// 第一个参数为true,第二个参数为超时时间
return dowait(true, unit.toNanos(timeout));
}
reset
// 重置 barrier 到初始化状态
// 通过组合 #breakBarrier() 和 #nextGeneration() 方法来实现。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 这里不会导致其他在 condition 的线程抛出异常,因为它们还在 wait,要唤醒后才会检查是否 borken
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
getNumberWaiting
// 获得等待的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
isBroken
// 判断 CyclicBarrier 是否处于中断
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
场景和优势
场景:多个线程相互等待的场景:例如多线程结果合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。
CyclicBarrier和CountDownLatch不同在于 CyclicBarrier 可以复用,适合分段任务有序执行的场景。
CyclicBarrier是通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量来是实现线程同步。
Semaphore
Semaphore信号量也是一个同步器,与CountDownLatch和CyclicBarrier不同的是,它内部的计数器是递增的,并且在初始化时可以指定计数器的初始值(通常为0),但不必知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
示例
public static void main(String[] args) throws InterruptedException {
final int THREAD_COUNT = 2;
// 初始信号量为0
Semaphore semaphore = new Semaphore(0);
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++){
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + " over");
// 信号量+1
semaphore.release();
}
});
}
// 当信号量达到2时才停止阻塞 等待子线程执行完毕,返回
semaphore.acquire(2);// 调用acquire的线程会一直阻塞,信号量计数变为2才会返回 如果构造semaphore时传递的参数为N并在M个线程中调用了该信号量的release方法,那么在调用acquire使M个线程同步时传递的参数应该是M+N。
System.out.println("all child thread over!");
// 关闭线程池
executorService.shutdown();
}
类图结构
Semaphore限制可以访问某些资源(物理或逻辑的)的线程数目
-
实际上是就是普通共享锁,内部静态类 Sync 继承了 AQS,实现了共享锁的 tryAcquireShared、tryReleaseShared。
-
逻辑就是正常共享锁逻辑,同时还可以实现公平和非公平
-
可以说是 ReentrantLock 的共享版,所以注意要在 finally 里面调用 semaphore.release() 释放锁
Sync 抽象类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);// 给AQS的state赋值,state值为当前持有信号量的个数
}
final int getPermits() {
return getState();
}
// 父类提供了非公平的 tryAcquireShared 方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 和 tryReleaseShared 一个效果,但是不会唤醒 CLH 的线程,不过下一次 tryReleaseShared 唤醒后就可以用这个释放的。
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 重置为 0,但是就算重置为 0,后面通过 aquirceShared 也就能进 permits 个
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
Sync 实现类
- NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
- FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // 多了个判断 CLH 队列还有没有等待的节点
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
构造方法
由图可知,Semaphore还是使用AQS实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类可以选取获取信号量时是否采用公平性策略(默认为非公平性的)。
public Semaphore(int permits) {
sync = new NonfairSync(permits);// 默认非公平策略
}
public Semaphore(int permits, boolean fair) {// 公平策略带两个参数
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
源码解析
void acquire()
表示当前线程希望获取一个信号量资源
- 如果当前信号量大于0,则当前信号量的计数减1,然后该方法直接返回。
- 否则如果当前信号量等于0,则当前线程被放入AQS的阻塞队列。
当其他线程调用了interrupt方法中断了线程,会抛出中断异常并返回。
public void acquire() throws InterruptedException {
// 参数为1说明要获取1个信号量资源
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 线程被中断抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 调用Sync子类方法尝试获取,这里根据构造函数决定公平策略
if (tryAcquireShared(arg) < 0)
// 如果获取失败则将当前线程放入阻塞队列。然后再次尝试,如果失败调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared由Sync的子类实现以根据公平性采取相应的行为。
以下是非公平策略NofairSync的实现:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前信号量值
int available = getState();
// 计算剩余值
int remaining = available - acquires;
// 如果剩余信号量小于0(当前信号量满足不了需求,直接返回负数,当前线程会被放入AQS的阻塞队列而被挂起)或者CAS更新信号量为剩余值成功直接返回剩余值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
假设线程A调用了acquire方法尝试获取信号量但因信号量不足被阻塞,这时线程B通过release增加了信号量,此时线程C完全可以调用acquire方法成功获取到信号量(如果信号量足够的话),这就是非公平性的体现。
下面是公平性的实现:
protected int tryAcquireShared(int acquires) {
for (;;) {
// 关键在于先判断AQS队列中是否已经有元素要获取信号量
if (hasQueuedPredecessors())// 公平策略是看当前节点的前驱节点是否也在等待获取该资源,如果是则放弃获取的权限,如何当前线程会被放入AQS阻塞队列,否则就去获取
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
hasQueuedPredecessors方法(锁原理剖析)用于判断当前线程的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS中,否则尝试去获取。
void acquire(int permits)
可获取permits个信号量。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
void acquireUninterruptibly()
不对中断进行响应。中断了不会抛出异常返回。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
void acquireUninterruptibly(int permits)
不对中断进行相应并且可获取多个信号量。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
void release()
使信号量加1,如果当前有线程因为调用acquire方法被阻塞而被放入AQS中的话,会根据公平性策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
public void release() {
// arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 尝试释放资源(增加信号量)
if (tryReleaseShared(arg)) {
// 释放资源成功则根据公平性策略唤醒AQS中阻塞的线程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {// 每次只对信号量增加1
for (;;) {
// 获取当前信号量值
int current = getState();
int next = current + releases;
// 将当前信号量增加releases,增加1
if (next < current) // overflow溢出处理
throw new Error("Maximum permit count exceeded");
// CAS操作保证更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
void release(int permits)
可增加permits个信号量。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);// 可以多个线程同时使用CAS去新增信号量的值而不被阻塞。
}
CountDownLatch计数器检测为0就可以往下进行,比join等待线程执行完毕后主线程才往下执行更好。
CyclicBarrier可以达到CountDownLatch效果,CountDownLatch计数器变为0后就不能被复用,CyclicBarrier可以通过reset方法重置后复用,对同一个算法但是输入参数不同的类似场景比较使用。
Semaphore采用了信号量新增的策略,一开始并不需要关心同步的线程个数,等调用aquire方法时再指定需要同步的个数,并且提供了获取信号量的公平性策略。