一、CountDownLatch介绍
- CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
- CountDownLatch使用给定的计数值(count)初始化。
await
方法会阻塞直到当前的计数值被countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。- 这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。
二、CountDownLatch的使用
构造器
2.1 常用方法
// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
// 会将 count 减 1,直至为 0
三、CountDownLatch应用场景
CountDownLatch一般用作多线程倒计时
计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。
CountDownLatch的两种使用场景:
- 场景1:让多个线程等待
- 场景2:让单个线程等待。
3.1 场景1 让多个线程等待:模拟并发,让并发线程一起执行
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//准备完毕……运动员都阻塞在这,等待号令
countDownLatch.await();
String parter = "【" + Thread.currentThread().getName() + "】";
System.out.println(parter + "开始执行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);// 裁判准备发令
countDownLatch.countDown();// 发令枪:执行发令
}
3.2 场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并
- 很多时候,我们的并发任务,存在前后依赖关系;
- 比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;
- 或者多个数据操作完成后,需要数据check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。
public class CountDownLatchTest2 {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
System.out.println(Thread.currentThread().getName()+" finish task" + index );
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
countDownLatch.await();
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
}
四、CountDownLatch实现原理
- 底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;
- 每次countDown()计数器值减一,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。
- 而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
4.1 CountDownLatch与Thread.join的区别
-
CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
-
CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
-
而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。
五、CountDownLatch核心源码分析
5.1 从构造方法查看
// CountDownLatch 的有参构造
public CountDownLatch(int count) {
// 健壮性校验
if (count < 0) throw new IllegalArgumentException("count < 0");
// 构建Sync给AQS的state赋值
this.sync = new Sync(count);
}
5.2 countDown方法
// countDown方法,本质就是调用了AQS的释放共享锁操作
// 这里的功能都是AQS提供的,只有tryReleaseShared需要实现的类自己去编写业务
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 唤醒在AQS队列中排队的线程。
doReleaseShared();
return true;
}
return false;
}
// countDownLatch实现的业务
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
// state - 1
int nextc = c-1;
// 用CAS赋值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 如果CountDownLatch中的state已经为0了,那么再次执行countDown跟没执行一样。
// 而且只要state变为0,await就不会阻塞线程。
5.3 await方法
// await方法
public void await() throws InterruptedException {
// 调用了AQS提供的获取共享锁并且允许中断的方法
sync.acquireSharedInterruptibly(1);
}
// AQS提欧的获取共享锁并且允许中断的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// countDownLatch操作
if (tryAcquireShared(arg) < 0)
// 如果返回的是-1,代表state肯定大于0
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch实现的tryAcquireShared
protected int tryAcquireShared(int acquires) {
// state为0,返回1,。否则返回-1
return (getState() == 0) ? 1 : -1;
}
// 让当前线程进到AQS队列,排队去
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 将当前线程封装为Node,并且添加到AQS的队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次走上面的tryAcquireShared,如果返回的是的1,代表state为0
int r = tryAcquireShared(arg);
if (r >= 0) {
// 会将当前线程和后面所有排队的线程都唤醒。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}