1.CountDownLatch
1.1 什么是CountDownLatch
CountDownLatch
是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
CountDownLatch
能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch
上等待的线程就可以恢复执行接下来的任务。
1.2 CountDownLatch与join
使用join
同样可以达到线程同步的效果,但是调用thread.join()
方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch
通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕。
而且如果我们使用线程池的话,就没有办法直接调用线程的join
方法了。所有另一方面来说,CountDownLatch
要比join
更加灵活。
1.3 CountDownLatch的基本使用
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
for (int i=0; i<=5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 运行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}).start();
}
System.out.println("等待子线程运行结束");
latch.await();
System.out.println("子线程运行结束");
}
}
这里主线程会阻塞在latch.await()
,直到CountDownLatch
技术为0。
1.4 CountDownLatch原理剖析
CountDownLatch类图
从上面我们可以直到Sync
继承了AQS
类,CountDownLatch
又持有一个成员变量Sync
,所有我们可以直到CountDownLatch
是基于AQS实现的。
通过构造方法我们又可以得知CountDownLatch
计数器的值赋给了AQS的state
变量。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
await()
当线程调用await
方法以后,当前线程会被阻塞,直到下面情况之一时才会返回:
- 当所有的线程都调用了
CountDownLatch
的countDown
方法后,也就是计数器为0时 - 其他线程调用了当前线程的
interrupt()
方法中断了当前线程,当前线程会抛出异常然后返回
// CountDownLatch的await()实现
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中获取共享资源可被中断的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判断当前线程是否已被中断,如果是则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// state不为0(意味着CountDownLatch还没有减到0)
// 则执行AQS的doAcquireSharedInterruptibly方法,让当前线程进入AQS队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch中的Sync中tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
// 当前state如果为0则返回1否则返回-1
return (getState() == 0) ? 1 : -1;
}
由上述代码我们可以知道线程获取资源时可以被中断,并且获取的是共享资源。
名为await
的方法还有一个,不过多个参数,也就是指定时间后,调用await(long timeout, TimeUnit unit)
的线程会超时而返回false,如果是正常返回的,那么返回值就为true。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
countDown()
线程调用该方法以后,计数器的值会递减,递减后如果计数器的值为0,那么就会唤醒所有因调用await
方法而阻塞的线程,否则什么都不做。
// CountDownLatch中的countDown方法
public void countDown() {
sync.releaseShared(1);
}
// AQS中的方法
public final boolean releaseShared(int arg) {
// 调用Sync实现的tryReleaseShared方法
if (tryReleaseShared(arg)) {
// AQS中释放资源的方法
doReleaseShared();
return true;
}
return false;
}
// Sync中重写的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
// 循环进行CAS操作,将state做减一操作,失败则一直重试
for (;;) {
// 获得当前的state变量
int c = getState();
// 如果state已经等于0,那么直接返回false
if (c == 0)
return false;
// 将state-1
int nextc = c-1;
// CAS操作修改state,成功以后判断state是否已经为0,为0则返回true,
// 再接下来就会调用AQS中的doReleaseShared方法释放资源
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2.CyclicBarrier
2.1 什么是CyclicBarrier
从字面理解,CyclicBarrier就是回环屏障的意思,它可以让一组线程达到一个状态后再同时执行。
- 回环的意思是在所有线程执行完毕以后,会重置CyclicBarrier的状态使它可以被重用
- 屏障的意思是线程调用await方法后都会被阻塞,这个阻塞点就被称为屏障,等到所有屏障都调用了await方法后,线程们就会冲破屏障继续向下运行
2.2 CyclicBarrier的基本使用
public class CycleBarrierTest {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
// 当计数器为0时,立即执行
@Override
public void run() {
System.out.println("汇总线程:" + Thread.currentThread().getName() + " 任务合并。");
}
});
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程A添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程A:" + Thread.currentThread().getName() + "执行任务。");
System.out.println("线程A:到达屏障点");
cyclicBarrier.await();
System.out.println("线程A:退出屏障点");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 将线程B添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程B:" + Thread.currentThread().getName() + "执行任务。");
System.out.println("线程B:到达屏障点");
cyclicBarrier.await();
System.out.println("线程B:退出屏障点");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 调用线程池的shutdown方法关闭线程池
// 该方法会使线程池从RUNNING状态转变为SHUTDOWN状态
// SHUTDOWN状态意味着:不再接收新的任务,但是会对任务队列中的任务进行处理
executorService.shutdown();
}
}
执行结果为:
线程A:pool-1-thread-1执行任务。
线程A:到达屏障点
线程B:pool-1-thread-2执行任务。
线程B:到达屏障点
汇总线程:pool-1-thread-2 任务合并。
线程B:退出屏障点
线程A:退出屏障点
上面的例子说明了多个线程之间是相互等待的,假如计数器值为N,那么随后调用 await 方法的 N–1 个线程都会因为到达屏障点而被阻塞,当第 N 个线程调用 await 后,计 数器值为 0 了,这时候第 N 个线程才会发出通知唤醒前面的 N–1 个线程。也就是当全部 线程都到达屏障点时才能一块继续向下执行。不过这个例子并没有体现出可重用性,不过这个其实也很好理解,就是可以反复使用,感兴趣的同学可以自己去了解一下。
2.3 CyclicBarrier原理剖析
CyclicBarrier类图
由类图可知,CyclicBarrier
基于独占锁实现,本质还是基于AQS实现的。
-
Generation
内部有一个变量broken
,用来记录当前屏障是否被打破,因为内部使用重入锁保证了线程安全,所以该属性不需要使用volatile
修饰 -
parties
用来记录线程个数,意味着parties
个线程调用await方法以后,才会“冲破屏障” -
count
一开始等于parties,每当有线程调用await
方法就会减一,当count
为零就意味着所有线程到达了屏障点
使用两个变量的原因就是为了达成CyclicBarrier
的复用性,当count
计数为0以后,会将parties
重新赋值给count
,从而进行复用。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
同时通过构造函数我们也可以直到,我们可以传递一个任务,而这个任务的执行时机是当所有的线程都到达屏障点以后。
await()
当线程调用await
方法被阻塞,直到满足以下条件之一时就会返回:
parties
个线程调用了await
方法,也就是所有线程都到达了屏障点- 其他线程调用了该线程的
interrupt
方法中断了该线程 - 与当前屏障点关联的
Generation
对象的broken
标志被设置为true时
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 调用dowait方法阻塞当前线程,第一个参数为false表示第二个参数不生效
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
// 指定timeout后会自动返回
return dowait(true, unit.toNanos(timeout));
}
dowait()
该方法实现了CyclicBarrier
的核心功能。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获得锁并进行加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 如果index=0就说明所有的线程都到达了屏障点,此时开始执行初始化传递的任务barrierAction
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行任务
if (command != null)
command.run();
ranAction = true;
// 激活其他因调用await阻塞的线程,并且重置了CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 如果index!=0
for (;;) {
try {
// 没有设置超时时间的操作
if (!timed)
trip.await();
// 设置了超时时间的操作
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
// 看到这里你应该就明白了为什么CyclicBarrier可以被复用
private void nextGeneration() {
// 唤醒条件队列中的阻塞线程
trip.signalAll();
// 重置CyclicBarrier
count = parties;
generation = new Generation();
}
3.CountDownLatch与CyclicBarrier
CountDownLatch
计数器不能重置,CyclicBarrier
可以重置循环利用。CountDownLatch
是基于AQS
的共享模式实现的,CyclicBarrier
是基于ReentrantLock
和Condition
实现的。- 两者最大的区别是,进行下一步动作的动作实施者是不一样的。这里的“动作实施者”有两种,一种是主线程(即执行main函数),另一种是执行任务的其他线程,后面叫这种线程为“其他线程”,区分于主线程。对于
CountDownLatch
,当计数为0的时候,下一步的动作实施者是main函数;对于CyclicBarrier
,下一步动作实施者是“其他线程”。