JUC源码系列-CountDownLatch源码研读

news2025/2/27 6:38:50

前言

CountDownLatch是一个很有用的工具,latch是门闩的意思,该工具是为了解决某些操作只能在一组操作全部执行完成后才能执行的情景。例如,小组早上开会,只有等所有人到齐了才能开;再如,游乐园里的过山车,一次可以坐10个人,为了节约成本,通常是等够10个人了才开。CountDown是倒数计数,所以CountDownLatch的用法通常是设定一个大于0的值,该值即代表需要等待的总任务数,每完成一个任务后,将总任务数减一,直到最后该值为0,说明所有等待的任务都执行完了,“门闩”此时就被打开,后面的任务可以继续执行。

CountDownLatch本身是基于共享锁实现的,如果你还不了解共享锁,建议先读一下共享锁的获取与释放,然后再继续往下看。

核心属性

CountDownLatch主要是通过AQS的共享锁机制实现的,因此它的核心属性只有一个sync,它继承自AQS,同时覆写了tryAcquireSharedtryReleaseShared,以完成具体的实现共享锁的获取与释放的逻辑。

private final Sync sync;
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

构造函数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在构造函数中,我们就是简单传入了一个不小于0的任务数,由上面Sync的构造函数可知,这个任务数就是AQS的state的初始值。

核心方法

CountDownLatch最核心的方法只有两个,一个是countDown方法,每调用一次,就会将当前的count减一,当count值为0时,就会唤醒所有等待中的线程;另一个是await方法,它有两种形式,一种是阻塞式,一种是带超时机制的形式,该方法用于将当前等待“门闩”开启的线程挂起,直到count值为0,这一点很类似于条件队列,相当于等待的条件就是count值为0,然而其底层的实现并不是用条件队列,而是共享锁。

countDown()

public void countDown() {
    sync.releaseShared(1);
}

前面说过,countDown()方法的目的就是将count值减一,并且在count值为0时,唤醒所有等待的线程,它内部调用的其实是释放共享锁的操作:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

该方法由AQS实现,但是tryReleaseShared方法由Sync类自己实现:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

该方法的实现很简单,就是获取当前的state值,如果已经为0了,直接返回false;否则通过CAS操作将state值减一,之后返回的是nextc == 0,由此可见,该方法只有在count值原来不为0,但是调用后变为0时,才会返回true,否则返回false,并且也可以看出,该方法在返回true之后,后面如果再次调用,还是会返回false。也就是说,调用该方法只有一种情况会返回true,那就是state值从大于0变为0值时,这时也是所有在门闩前的任务都完成了。

tryReleaseShared返回true以后,将调用doReleaseShared方法唤醒所有等待中的线程,该方法我们在前面的文章中已经详细分析过了,这里就不再赘述了。

值得一提的是,我们其实并不关心releaseShared的返回值,而只关心tryReleaseShared的返回值,或者只关心count到0了没有,这里更像是借了共享锁的“壳”,来完成我们的目的,事实上我们完全可以自己设一个全局变量count来实现相同的效果,只不过对这个全局变量的操作也必须使用CAS。

await()

与Condition的await()方法的语义相同,该方法是阻塞式地等待,并且是响应中断的,只不过它不是在等待signal操作,而是在等待count值为0:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可见,await方法内部调用的是acquireSharedInterruptibly方法,相当于借用了获取共享锁的“壳”:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我们来回忆一下独占模式下对应的方法:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可见,两者用的是同一个框架,只是这里:

  • tryAcquire(arg) 换成了 tryAcquireShared(arg) (子类实现)
  • doAcquireInterruptibly(arg) 换成了 doAcquireSharedInterruptibly(arg) (AQS提供)

我们先来看看Sync子类对于tryAcquireShared的实现:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

该方法似乎有点挂羊头卖狗肉的感觉——所谓的获取共享锁,事实上并不是什么抢锁的行为,没有任何CAS操作,它就是判断当前的state值是不是0,是就返回1,不是就返回-1。

值得注意的是,在共享锁的获取与释放中我们特别提到过tryAcquireShared返回值的含义:

  • 如果该值小于0,则代表当前线程获取共享锁失败
  • 如果该值大于0,则代表当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功
  • 如果该值等于0,则代表当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败

所以,当该方法的返回值不小于0时,就说明抢锁成功,可以直接退出了,所对应的就是count值已经为0,所有等待的事件都满足了。否则,我们调用doAcquireSharedInterruptibly(arg)将当前线程封装成Node,丢到sync queue中去阻塞等待:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在前面我们介绍共享锁的获取时,已经分析过了doAcquireShared方法,只是它是不抛出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中断版本,我们可以直接对比一下:
doAcquireShared-vs-doAcquireSharedInterruptibly

可见,它们仅仅是在对待中断的处理方式上有所不同,其他部分都是一样的,由于doAcquireShared在前面的文章中我们已经详细分析过了,这里就不再赘述了。

await(long timeout, TimeUnit unit)

相较于await()方法,await(long timeout, TimeUnit unit)提供了超时等待机制:

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,在tryAcquireSharedNanos方法中,我们用到了doAcquireSharedNanos的返回值,如果该方法因为超时而退出时,则将返回false。由于await()方法是阻塞式的,也就是说没有获取到锁是不会退出的,因此它没有返回值,换句话说,如果它正常返回了,则一定是因为获取到了锁而返回; 而await(long timeout, TimeUnit unit)由于有了超时机制,它是有返回值的,返回值为true则表示获取锁成功,为false则表示获取锁失败。doAcquireSharedNanos的这个返回值有助于我们理解该方法究竟是因为获取到了锁而返回,还是因为超时时间到了而返回。

至于doAcquireSharedNanos的实现细节,由于他和doAcquireSharedInterruptibly相比只是多了一个超时机制:
doAcquireSharedInterruptibly-vs-doAcquireSharedNanos

代码本身很简单,就不赘述了。

实战

接下来我们来学习一个使用CountDownLatch的实际例子,Java的官方源码已经为我们提供了一个使用的示例代码:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don't let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

在这个例子中,有两个“闸门”,一个是CountDownLatch startSignal = new CountDownLatch(1),它开启后,等待在这个“闸门”上的任务才能开始运行;另一个“闸门”是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N个任务都执行完成后,才能继续往下。

Worker实现了Runnable接口,代表了要执行的任务,在它的run方法中,我们先调用了startSignal.await(),等待startSignal这一“闸门”开启,闸门开启后,我们就执行自己的任务,任务完成后再执行doneSignal.countDown(),将等待的总任务数减一。

代码本身的逻辑非常简单好懂,这里不赘述了。

总结

  • CountDownLatch相当于一个“门栓”,一个“闸门”,只有它开启了,代码才能继续往下执行。通常情况下,如果当前线程需要等其他线程执行完成后才能执行,我们就可以使用CountDownLatch。
  • 使用CountDownLatch#await方法阻塞等待一个“闸门”的开启。
  • 使用CountDownLatch#countDown方法减少闸门所等待的任务数。
  • CountDownLatch基于共享锁实现。
  • CountDownLatch是一次性的,“闸门”开启后,无法再重复使用,如果想重复使用,应该用[CyclicBarrier]()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/412865.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

运行时内存数据区之堆(二)

Minor GC、Major GC、与Full GC JVM在进行GC时&#xff0c;并非每次都对上面三个内存&#xff08;新生代、老年代&#xff1a;方法区&#xff09;区域一起回收的&#xff0c;大部分时候回收的都是指新生代。 针对HotSpot VM的实现&#xff0c;它里面的GC按照回收区域又分为两…

浅谈 如果做微服务了 这个模块怎么去划分?

如果做微服务了 这个模块怎么去划分&#xff1f; 还是高内聚 低耦合的一个思想吧 &#xff0c;单一职责的设计原则&#xff0c;也是一个封装的思想吧&#xff0c; 业务维度&#xff1a; ​ 按照业务的关联程度来决定&#xff0c;关联比较密切的业务适合拆分为一个微服务&…

C++语法(14)---- 模板进阶

C语法&#xff08;13&#xff09;---- 模拟实现priority_queue_哈里沃克的博客-CSDN博客https://blog.csdn.net/m0_63488627/article/details/130069707?spm1001.2014.3001.5501 目录 1.非类型模板参数 2.模板的特化 1.函数模板(仿函数) 2.类模板 1.全特化 2.半特化、偏…

INFINONE XC164单片机逆向记录(6)C语言学习

本人所写的博客都为开发之中遇到问题记录的随笔,主要是给自己积累些问题。免日后无印象,如有不当之处敬请指正(欢迎进扣群 24849632 探讨问题); 写在专栏前面https://blog.csdn.net/Junping1982/article/details/129955766 INFINONE XC164单片机逆向记录(1)资料准备

FusionCharts Suite XT v3.20.0 Crack

FusionCharts Suite XT v3.20.0 改进了仪表的径向条形图和调整大小功能。2023 年 4 月 11 日 - 9:37新版本特征 添加了一个新方法“_changeXAxisCoordinates”&#xff0c;它允许用户将 x 轴更改为在图例或数据交互时自动居中对齐。更新了 Angular 集成以支持 Angular 版本 14 …

【无功优化】基于多目标差分进化算法的含DG配电网无功优化模型【IEEE33节点】(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

SAM - 分割一切图像【AI大模型】

如果你认为 AI 领域已经通过 ChatGPT、GPT4 和 Stable Diffusion 快速发展&#xff0c;那么请系好安全带&#xff0c;为 AI 的下一个突破性创新做好准备。 推荐&#xff1a;用 NSDT场景设计器 快速搭建3D场景。 Meta 的 FAIR 实验室刚刚发布了 Segment Anything Model (SAM)&am…

电脑软件:推荐一款Windows剪贴板增强软件——ClipX

目录 ClipX能做什么&#xff1f; 软件优点 软件不足之处 今天要介绍的剪切板神器——ClipX&#xff0c;拥有它可以作为弥补Windows 自带的剪贴板的短板的增强型工具软件。 ClipX能做什么&#xff1f; 1. 扩充剪贴板数量&#xff0c;数量可以自己设置 ClipX支持4到1024个剪…

Flutter(三)--可滚动布局

之前介绍了布局和容器&#xff0c;它们都用于摆放一个或多个子组件&#xff0c;而实际应用中&#xff0c;受限于手机、Pad、电脑的屏幕大小&#xff0c;一个布局不可能摆放无限个组件&#xff0c;我们往往采取滚动的方式&#xff0c;来使得一部分组件展示在屏幕上&#xff0c;一…

L2-041 插松枝PTA

人造松枝加工场的工人需要将各种尺寸的塑料松针插到松枝干上&#xff0c;做成大大小小的松枝。他们的工作流程&#xff08;并不&#xff09;是这样的&#xff1a; 每人手边有一只小盒子&#xff0c;初始状态为空。每人面前有用不完的松枝干和一个推送器&#xff0c;每次推送一…

piwigo安装及初步使用

一 摘要 本文主要介绍piwigo 安装及初步使用&#xff0c;nginx \php\mysql 等使用 docker 安装 二 环境信息 2.1 操作系统 CentOS Linux release 7.9.2009 (Core)2.2 piwigo piwigo-13.6.0.zip三 安装 3.1安装资源下载 piwigo 请到官网下载https://piwigo.org 安装步骤也…

【STL九】关联容器——map容器、multimap容器

【STL九】关联容器——map容器、multimap容器一、map简介二、头文件三、模板类四、map的内部结构五、成员函数1、迭代器2、元素访问3、容量4、修改操作~~5、操作~~5、查找6、查看操作六、demo1、查找find2、查找lower_bound、upper_bound3、insert、emplace() 和 emplace_hint(…

超详细!Apache+Tomcat+mod_jk搭建负载均衡集群

目录 0.流程图&#xff1a; 1.集群环境&#xff1a; 2.Apache服务器安装httpd&#xff1a; 3.tomcat1服务器和tomcat2服务器安装jdk和Tomcat 4.tomcat1服务器和tomcat2服务器创建页面&#xff1a; 5.Apache服务器的mod_jk模块的安装&#xff1a; 6.查看是否mod_jk.so模块…

DMDSC问题测试

问题一&#xff1a;手动停止两节点&#xff0c;单独启动节点二测试 集群停库前状态&#xff0c;登录监视器查看 dmcssm INI_PATHdmcssm.ini show 节点一&#xff1a; [dmdbalocalhost ~]$ DmServiceDMSERVER stop Stopping DmServiceDMSERVER: …

Go语言开发小技巧易错点100例(六)

往期回顾&#xff1a; Go语言开发小技巧&易错点100例&#xff08;一&#xff09;Go语言开发小技巧&易错点100例&#xff08;二&#xff09;Go语言开发小技巧&易错点100例&#xff08;三&#xff09;Go语言开发小技巧&易错点100例&#xff08;四&#xff09;Go…

微信小程序开发-云开发降低资源调用次数

问题 微信小程序云开发是很方便&#xff0c;减少了后端的大量工作&#xff0c;但是&#xff01; 流量主的一点广告费&#xff0c;一不小心就全被腾讯薅走了&#xff01;当然一种办法就是使用云服务器自建后端&#xff0c;也要付费&#xff0c;没有对比过&#xff0c;不知道各…

如何在移动应用中集成美颜SDK实现人脸识别和美化功能?

随着移动应用的普及和人们对美的追求&#xff0c;美颜功能已成为很多应用的必备功能。而为了实现这样的功能&#xff0c;开发者需要使用美颜SDK。本文将从以下几个方面介绍如何在移动应用中集成美颜SDK实现人脸识别和美化功能。 一、美颜SDK的介绍 美颜SDK是一种用于美化人脸的…

应用程序接口(API)安全的入门指南

本文简单回顾了 API 的发展历史&#xff0c;其基本概念、功能、相关协议、以及使用场景&#xff0c;重点讨论了与之相关的不同安全要素、威胁、认证方法、以及十二项优秀实践。 根据有记录的历史&#xff0c;随着 Salesforce 的销售自动化解决方案的推出&#xff0c;首个 We…

缩短客户响应时间的 5 种方法

在当今竞争激烈的世界中&#xff0c;客户服务就是确保卓越的客户体验。这意味着顶级品牌必须竞争为客户提供最好的客户服务&#xff0c;而且提供最快的响应时间。 改善客户服务响应时间的 5种方法 1.使用正确的客户服务软件 客户服务软件是您可以为提高客户服务而进行的最佳投资…

手写Spring框架-前奏-注解与自定义注解

目录 注解 介绍 功能 分类 注解处理器类库 自定义注解 常用元注解 自定义 注解 介绍 提供一种为程序元素设置元数据的方法用来将任何的信息或元数据&#xff08;metadata&#xff09;与程序元素&#xff08;类、方法、成员变量等&#xff09;进行关联元数据是指数据的…