目录
1 前言
2 常用方法
3 示例
4 解析
4.1 countDown()
4.2 await() 源码
1 前言
countDownLatch( 门阀
、 计数器
)是多线程控制的一种工具 ,它用来协调各个线程之间的同步。
countDownLatch相当于一个计数器,能够使一个线程等待另外一些线程完成各自的工作后,再继续执行。这个计数器的初始值就是线程的数量,每当一个线程完成之后,计数器就进行减1,当计数器的值为0时,那么在countDownLatch上等待的线程就可以继续执行。
countDownLatch接收一个int类型的参数,表示要等待的工作线程个数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
2 常用方法
方法 | 说明 |
await() | 使当前线程进入同步队列进行等待,直到latch 的值被减到0 或者当前线程被中断,当前线程就会被唤醒。 |
await(long timeout, TimeUnit unit) | 带超时时间的await() 。 |
countDown() | 使latch 的值减1 ,如果减到了0 ,则会唤醒所有等待在这个latch 上的线程 |
getCount() | 获得latch 的数值 |
3 示例
让5个子线程的任务执行完成之后再执行主线程的任务。
public class CountDownLatchDemo {
private static final int THRED_NUM = 5;
public static void main(String[] args) {
//创建固定线城市数量的线程池
ExecutorService pool = Executors.newFixedThreadPool(THRED_NUM);
//如果有n个线程 就指定CountDownLatch的计数器为n
CountDownLatch countDownLatch = new CountDownLatch(THRED_NUM);
for (int i = 0; i < THRED_NUM; i++) {
pool.execute(()->{
try {
System.out.println("子线程:"+Thread.currentThread().getName()+"开始执行");
//模拟每个线程处理业务,耗时一秒钟
Thread.sleep(1000);
System.out.println("子线程:"+Thread.currentThread().getName()+"执行完成");
//当前线程调用此方法,则计数减1
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
}
//阻塞当前线程,直到计数器的值为0,主线程才开始处理
try {
countDownLatch.await();
System.out.println("等待子线程完成:,主线程"+Thread.currentThread().getName()+"开始执行,此时countDownLatch的计数器为0");
} catch (Exception e) {
e.printStackTrace();
}
//销毁线程池
pool.shutdown();
}
}
运行结果:
注意:主线程的输出语句是在子线程结束之后再进行输出。
4 解析
CountDownLatch
通过内部类Sync
来实现同步语义,而Sync又继承了AQS。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 设置同步状态的值
Sync(int count) {
setState(count);
}
// 获取同步状态的值
int getCount() {
return getState();
}
// 尝试获取同步状态,只有同步状态的值为0的时候才成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放同步状态,每次释放通过CAS将同步状态的值减1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 如果同步状态的值已经是0了,不要再释放同步状态了,也不要减1了
if (c == 0)
return false;
// 减1
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
4.1 countDown()
countDown()的源码如下:
public void countDown() {
sync.releaseShared(1);
}
调用的是AQS
的releaseShared(int arg)
方法:
public final boolean releaseShared(int arg) {
// 尝试释放同步状态
if (tryReleaseShared(arg)) {
// 如果成功,进入自旋,尝试唤醒同步队列中头结点的后继节点
doReleaseShared();
return true;
}
return false;
}
通过tryReleaseShared(arg)
尝试释放同步状态,具体的实现被Sync
重写了,源码:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 同步状态值减1
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如果同步状态值减到0
,则释放成功,进入自旋,尝试唤醒同步队列中头结点的后继节点,调用的是AQS
的doReleaseShared()
函数:
private void doReleaseShared() {
for (;;) {
// 获取头结点
Node h = head;
if (h != null && h != tail) {
// 获取头结点的状态
int ws = h.waitStatus;
// 如果是SIGNAL,尝试唤醒后继节点
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒头结点的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这里调用了unparkSuccessor(h)
去唤醒头结点的后继节点。
4.2 await() 源码
await()
源码如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
调用的是AQS
的acquireSharedInterruptibly(int arg)
方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取同步状态
if (tryAcquireShared(arg) < 0)
// 获取同步状态失败,自旋
doAcquireSharedInterruptibly(arg);
}
首先,通过tryAcquireShared(arg)
尝试获取同步状态,具体的实现被Sync
重写了,查看源码:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果同步状态的值为0,获取成功。这就是CountDownLatch的机制,尝试获取latch的线程只有当latch的值减到0的时候,才能获取成功。
如果获取失败,则会调用AQS的doAcquireSharedInterruptibly(int arg)函数自旋,尝试挂起当前线程:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将当前线程加入同步队列的尾部
final Node node = addWaiter(Node.SHARED);
try {
// 自旋
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头结点,则尝试获取同步状态
if (p == head) {
// 当前节点尝试获取同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果获取成功,则设置当前节点为头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 如果当前节点的前驱不是头结点,尝试挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
这里,调用shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
挂起当前线程。
参考文章:CountDownLatch详解_西瓜游侠的博客-CSDN博客_countdownlatch