知道CountDownLatch是做什么的,那你知道它的底层是如何实现的吗?

news2025/1/12 7:42:20

一、概述

CountDownLatch是一个多线程控制工具,用来控制线程的等待。设置需要countDown的数量num,然后每一个线程执行完毕后,调用countDown()方法,而主线程调用await()方法执行等待,直到num个子线程执行了countDown()方法 ,则主线程解除阻塞,开始继续执行。

其具体操作流程类似火箭发射,我们通过倒数三二一(3个子线程分别调用countDown()),那么火箭就发射升空了(主线程await()方法处就释放了阻塞,可以继续向下执行):

代码上的使用方法如下所示:

首先创建CountDownLatch实例对象,并传入需要倒数的count值;
其次】在主线程处通过调用await()方法进行阻塞操作;
最后】当子线程执行完某个任务之后,调用countDown()方法执行倒计时减1操作;当倒计时为0的时候,主线程解除阻塞,继续执行await()方法下面的代码逻辑;

我们以实例CountDownLatchDemo为例,看一下具体的代码实现:

二、构造函数解析

在CountDownLatch的构造函数中,我们通过指定入参count的值,来设置需要调用多少次countDown()方法才会释放对当前线程的阻塞。构造方法逻辑比较简单,如果我们设置的count值小于0,则说明是一个违规值,会随之抛出IllegalArgumentException异常;代码如下所示:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

如果设置的count值是合法值,那么则通过setState(count)方法,将count赋值给AQS中的state变量。那么这个state值,就可以用来做倒计时的计数用了,如果为0,则表示倒计时结束,否则,则依然无法解除主线程的阻塞状态。

三、await()方法源码解析

从上面的演示示例中,我们已经看到,通过在主线程中调用countDownLatch.await()方法,使得主线程进入阻塞状态,那么其内部是如何实现的呢?我们把视角转移到await()方法中。在其方法内,只有一行代码,即,调用sync的acquireSharedInterruptibly(1)方法,此处需要额外说明一下,这个sync其实是继承了AQS类的实例对象,所以,它同时也具备了AQS的所有功能,那么从这里大家也能得出一个结论,就是CountDownLatch所具备的能力其实底层都是通过AQS实现的。代码如下所示:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly(...)方法中,如果发现发生过interrupt,则抛出InterruptedException异常;如果没发生过interrupt,则通过调用tryAcquireShared(arg)方法来判断是否倒计时已经结束了,如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;如果倒计时没有结束(即:tryAcquireShared(arg)返回-1),则继续执行doAcquireSharedInterruptibly(arg)方法,代码如下所示:

此处展示了tryAcquireShared(arg)方法的内部处理逻辑,即:如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;代码如下所述:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; // 1表示倒计时结束;-1表示倒计时进行中;
}

如果倒计时没有结束,则会执行doAcquireSharedInterruptibly(-1)方法,在该方法内部主要由四部分逻辑组成,下面我们也会依次针对这些部分进行详细解析:

步骤1】创建AQS队列,将Node节点放到队列中。
步骤2】如果倒计时未完成,则执行阻塞操作。
步骤3】如果倒计时完成,解除阻塞操作。
步骤4】如果存在异常发生,则对失败进行收尾工作。

3.1> 创建AQS队列

因为在上面已经说过——CountDownLatch所具备的能力其实底层都是通过AQS实现的。而AQS底层就是通过维护节点链表实现的抢锁行为,那么对于CountDownLatch我们也需要创建这样一个链表数据结构,这部分逻辑就在addWaiter(Node.SHARED)方法中。此处需要额外说明一下的就是,对于入参值Node.SHARED,仅仅是一个空属性值的Node节点。

addWaiter(...)方法内部,主要针对两种情况进行了处理:

情况1】如果链表已经创建过了,那么直接讲node放置到链表末尾即可,返回node;
情况2】如果没创建,则创建链表,并将node插入到链表末尾,返回node;

针对enq(node)方法的内部逻辑,下图以节点数据结构进行了进一步的解释,请见下图所示:

3.2> 执行阻塞操作

当我们执行完上面的addWaiter(Node.SHARED)方法,创建了AQS队列之后,我们就开始了下面的无限for循环逻辑。在for(;;)无限循环中,会尝试获得r值,其含义如下所示:

r==1】表示state等于0,倒计时完毕。
r==-1】表示state不等于0,倒计时还在进行中。

那么,此处我们的前提条件就是——倒计时还在进行中;所以r等于-1,无法满足下面一行的if(r>=0)的判断条件,所以,不执行该if逻辑。而是直接跳转到“步骤3:执行阻塞操作”这部分红框代码中了,具体请见下图所示:

在“步骤3:执行阻塞操作”这步骤中,主要执行了两个方法:shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt(),下面我们就分别来分析这两个方法的具体执行过程。

shouldParkAfterFailedAcquire(p, node)方法中,会执行如下逻辑:

static final int CANCELLED =  1; /** waitStatus value to indicate thread has cancelled */
static final int SIGNAL    = -1; /** waitStatus value to indicate successor's thread needs unparking */
static final int CONDITION = -2; /** waitStatus value to indicate thread is waiting on condition */
static final int PROPAGATE = -3; /** waitStatus value to indicate the next acquireShared should unconditionally propagate */

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; 
    if (ws == Node.SIGNAL) // SIGNAL值,表示后继线程需要unparking
        return true;
    if (ws > 0) { // ws大于0,说明是CANCELLED节点,清理该节点
        do { 
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 否则,将当前节点赋值为SIGNAL
    }
    return false;
}

如果节点的waitStatus == -1】则直接返回true
如果节点的waitStatus > 0】说明是CANCELLED节点,那么清理该节点及所有相邻前置的CANCELLED节点,并返回false;
如果节点的waitStatus是其他值】通过CAS将节点的waitStatus值变为-1(Node.SIGNAL),并返回false

那么由于head指针指向的Node节点waitStatus等于0,所以,第一次执行shouldParkAfterFailedAcquire(...)方法之后,head节点的waitStatus从0变为-1;那么当再次执行shouldParkAfterFailedAcquire(...)方法的时候,则满足:waitStatus == -1,直接返回true了,请见下图所示:

shouldParkAfterFailedAcquire(p, node)方法在执行第二遍之后返回了true,那么就轮到触发parkAndCheckInterrupt()方法的时刻了,它内部逻辑非常简单,就是执行了两个步骤:步骤1,调用LockSupport.park方法对当前线程进行阻塞;步骤2,解除阻塞后,如果发生了interrupt,则返回true;否则返回false;代码如下所示:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 线程在此处被阻塞
    return Thread.interrupted(); // 如果发生了interrupt,则返回true;否则返回false
}

3.3> 解除阻塞操作

当“倒计时”结束,即:执行了足够次数的countDown()方法(此步骤会在“四、countDown()方法源码解析”章节进行介绍);则会触发解除阻塞的操作了,即:下图红框内的代码。

那么在上述红框代码中,关键的代码逻辑就是setHeadAndPropagate(node, r),其中:node为存储了当前线程的节点(即:node.thread=主线程),r等于1 ;

setHeadAndPropagate(node, r)方法的作用是用于,请见如下源码所示:

static final Node SHARED = new Node(); // 空值节点

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 暂存旧的头节点
    setHead(node); // 设置新的头节点
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;  // 获取node的下一个节点
        if (s == null || s.isShared()) // 如果node就是尾节点或者s.nextWaiter等于SHARED
            doReleaseShared();
    }
}

/**
 * 设置头节点
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
 * 如果node在共享模式下等待,返回true。
 */
final boolean isShared() {
    return nextWaiter == SHARED;
}

对于CountDownLatch来说,doReleaseShared()方法其实没有什么作用,因为原本链表就两个节点,一个虚拟头结点(head指针),一个是当下主线程节点(tail指针);当head指针指向下一个节点时,则head==tail,那么就会直接break跳出无限for循环(for(;;)

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 由于此时h等于tail,所以不满足if判断
        if (h != null && h != tail) { 
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                    continue;         
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;               
        }

        // 由于h等于tail(可参照3.2画的链表图),跳出该方法
        if (h == head) break;
    }
}

执行完上面所说的setHeadAndPropagate(node, r)方法之后,基本就可以结束await()方法的逻辑,继续执行主线程剩下的逻辑代码了。

3.4> 针对执行失败后的收尾工作

如果顺利的解除阻塞的话,failed变量会被赋值为false,那么在finally中的cancelAcquire(node)方法则不会被调用。反之,如果failed等于true,则说明阻塞并未按照正常的unpark方式解除阻塞,即,通过异常的方式解除的阻塞,那么我们就需要执行cancelAcquire(node)方法进行失败后的收尾工作了,具体代码如下所示:

cancelAcquire方法中,尝试在AQS的队列链表中断开node节点的,即,剔除掉node节点。由于此处并非主流程,所以具体的代码和注释如下所示,就不再赘述了。

private void cancelAcquire(Node node) {
    if (node == null) // node是保存了主线程的节点,不为空
        return;

    node.thread = null; // 将node保存的线程置为空,即,丢弃之前保存的主线程
    Node pred = node.prev;
    while (pred.waitStatus > 0) // pred的waitStatus等于-1,不满足
        node.prev = pred = pred.prev;

    Node predNext = pred.next; // predNext等于null
    node.waitStatus = Node.CANCELLED;
    // node等于tail,将tail指针指向pred节点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null); // 将p
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

四、countDown()方法源码解析

子线程通过调用countDown()方法来实现“倒计时”操作,所以,下面我们就来着重分析一下这个方法的具体执行过程,代码如下所示:

public void countDown() {
    sync.releaseShared(1);
}

releaseShared(1)方法中,首先通过tryReleaseShared(arg)进行判断,只有倒计时最后一次countDown调用才会返回true,其他情况都会返回false;而如果返回的是true,才会继续执行if方法内的逻辑,即:doReleaseShared()方法。

4.1> tryReleaseShared(arg)

在该方法内部,首先开启了无限for循环,那么首先获取了当前的倒计时总数state的值,如果等于0,则说明在本次调用countDown()方法之前,倒计时就已经结束了,则此时直接返回false

如果倒计是没有结束,则继续往下执行,先将倒计时总数减1,如果等于0,则说明本次调用countDown()方法是倒计时的最后一次,那么应该可以触发后续的解除主线程阻塞的操作了,那么此时直接返回true;但是,如果不等于0,则表示倒计时仍在继续中,则此时直接返回false;具体代码如下所示:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState(); // 获得倒计时总数state
        if (c == 0) return false; // 如果等于0,则表示倒计时结束,返回false
        int nextc = c - 1; // 否则,倒计时总数减1
        if (compareAndSetState(c, nextc)) // 然后将最新的倒计时数,更新到state的值
            return nextc == 0; // 如果等于0,返回true;否则,返回false
    }
}

4.2> doReleaseShared()

doReleaseShared()方法中,我们要开始真正的执行解除阻塞的操作了。方法首先开启了无限for循环,然后进行了一系列的判断,对于当前AQS队列的情况,上面已经通过图的方式表现了,为了便于大家回忆,我又把它粘贴到了doReleaseShared()方法源码的下面,此时h不等于null,并且h不等于tail,并且h的waitStatus等于-1(Node.SIGNAL),所以是可以顺利执行unparkSuccessor(h)这行代码的;当解除阻塞后,此时head指针向后移动一个节点,那么在第二次循环时,由于无法满足h!=tail,则执行第14行——if(h==head) break;跳出无限循环,结束本方法了。具体代码如下所示:

unparkSuccessor(h)方法中,我们获得了head头节点的下一个节点s,即:也就是保存主线程的节点,然后针对s节点存储的thread(即:主线程)执行unpark操作。因此,主线程的阻塞被解除了。具体代码如下所示:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus; // 此时ws等于0
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next; // s就是head节点的next节点,也就是保存主线程的节点

    // s不等于null,并且s.waitStatus等于0
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    // 针对s中存储的thread(即:主线程)执行unpark操作
    if (s != null) LockSupport.unpark(s.thread);
}

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享 。

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/945343.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity——工程与资源

本文将详细介绍Unity工程的文件夹结构&#xff0c;以及动态加载资源的技术要点 一、Unity项目的文件夹结构 1.工程文件夹 在新建工程时&#xff0c;Unity会创建所有必要的文件夹。第一级文件夹有Assets,Library,Logs,Packages,ProjectSettings。 Assets&#xff1a;最主要的文…

Win10永恒之黑CVE-2020-0796复现shell

鸣谢文章&#xff1a; CVE-2020-0796&#xff08;永恒之黑&#xff09;漏洞利用getshell复现详细过程 影响版本&#xff1a; Windows 10 Version 1903 for 32-bit Systems Windows 10 Version 1903 for x64-based Systems Windows 10 Version 1903 for ARM64-based Systems …

实训笔记8.29

实训笔记8.29 8.29笔记一、《白龙马电商用户行为日志分析平台》项目概述--大数据离线项目1.1 项目的预备知识1.1.1 电商平台1.1.2 用户行为数据1.1.3 常见的软件/网站的组成和技术实现1.1.4 大数据中数据计算场景 1.2 项目的开发背景和开发意义1.3 项目的开发流程和技术选项1.4…

【真题解析】系统集成项目管理工程师 2022 年下半年真题卷(综合知识)

本文为系统集成项目管理工程师考试(软考) 2022 年下半年真题&#xff08;全国卷&#xff09;&#xff0c;包含答案与详细解析。考试共分为两科&#xff0c;成绩均 ≥45 即可通过考试&#xff1a; 综合知识&#xff08;选择题 75 道&#xff0c;75分&#xff09;案例分析&#x…

聊聊十大网络安全上市公司,看F5拥有强大安全基因

在应用数量爆炸式增长的当下&#xff0c;包括供应链攻击、零日漏洞及数据泄露在内的安全威胁随处可见。从传统应用到现代应用再到边缘、多云、多中心的安全防护&#xff0c;安全已成为企业数字化转型中的首要挑战。谈到十大网络安全上市公司&#xff0c;拥有强大安全基因的F5是…

如何利用 Agent 构建AI服务

近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术的飞速发展引起了广泛的关注和讨论。而如今&#xff0c;我们正站在一个全新的时代门槛前&#xff0c;面对着AI Agent带来的的崭新未来。以LLM&#xff08;大型语言模型&#xff09;作为其核心控制器构建代理是一个很酷…

ubuntu系统安装qemu虚拟机

安装命令 sudo apt install qemu qemu-kvm virt-manager bridge-utils -y 安装过程 安装完成之后需要重新启动操作系统 软件使用 拷贝系统镜像进行安装即可&#xff0c;跟vmware 和virbox也都类似

Springboot启动之后自动跳转浏览器

2023-08-29 20:47:32.680 INFO 23700 --- [ main] com.liu.ReggieApplication : 项目启动成功。。。当项目启动成功后&#xff0c;不会自动弹出浏览器到默认页面 学要写个配置类 首先创建一个资源配置 url.properties openProject.isOpentrue openP…

办公网络上网行为管理规划

办公网络上网行为管理规划是确保办公网络资源的合理利用和保障网络安全的重要措施。下面是办公网络上网行为管理规划的一般性步骤和原则&#xff1a; 确立政策和准则&#xff1a; 制定明确的上网行为管理政策和准则&#xff0c;明确员工在办公网络上的合规要求和行为规范。 包…

ADS 错误 1808可能原因 ADSError 1808

​ 调试问题记录&#xff1a; 背景&#xff1a; Ads调试时遇到错误&#xff0c;返回码是 1808&#xff0c;查询倍福官网 得出1808错误原因是 symbol not found 原因&#xff1a; ADSError: symbol not found (1808). Possible incorrect runtime port selected 可能是ads的地…

【STM32】学习笔记-江科大

【STM32】学习笔记-江科大 1、STM32F103C8T6的GPIO口输出 2、GPIO口输出 GPIO&#xff08;General Purpose Input Output&#xff09;通用输入输出口可配置为8种输入输出模式引脚电平&#xff1a;0V~3.3V&#xff0c;部分引脚可容忍5V输出模式下可控制端口输出高低电平&#…

BI系统框架模型与指标库参考

主数据 &#xff1a;组织|岗位|人员|大区|三大主数据&#xff08;客户、物料、供应商&#xff09;财务主数据&#xff08;科目|成本中心|利润中心|资产&#xff09;|工作中心|工艺路线 业务数据 &#xff1a;线索|业务机会|合同|订单|采购|生产|发货|物流|财务&#xff0…

如何从任何打印机扫描到你的计算机

即使在这个几乎所有东西都是在线和虚拟的时代&#xff0c;你仍然会得到一些实物文档。保存这些文档的最佳方法是扫描它们并将其保存在硬盘、云存储或NAS上。 为此&#xff0c;你需要一台多功能或一体式打印机。然而&#xff0c;这些设备的设置和使用可能会令人困惑。它们通常需…

ARM处理器核心概述

一、基于ARM处理器的嵌入式系统 ARM核深度嵌入SOC中&#xff0c;通过JTAG口进行外部调试。计通常既有外部内存又有内部内存&#xff0c;从而支持不通的内存宽度、速度和大小。一般会包含一个中断控制器。可能包含一些Primece外设&#xff0c;需要从ARM公司取得授权。总线使用A…

工厂人员作业现场异常违规行为识别

工厂人员作业现场异常违规行为识别运用yolov7网络模型框架的图像识别技术&#xff0c;工厂人员作业现场异常违规行为识别工厂人员的行为是否合规SOP流程操作规范&#xff0c;帮助作业人员及时发现并纠正违规行为&#xff0c;确保作业过程的安全和合规性。Yolo意思是You Only Lo…

【已解决】Java 后端使用数组流 Array.stream() 将数组格式的 Cookie 转换成字符串格式

&#x1f389;工作中遇到这样一个场景&#xff1a;远程调用某个接口&#xff0c;该接口需要用户的 Cookie 信息进行权限认证&#xff0c;认证通过之后才可以打通并返回数据。 在后端拿到 httpServletRequest 后&#xff0c;调用 getCookies() 方法&#xff0c;返回的是一个 Coo…

10.物联网LWIP之TCP状态转变

一。TCP状态机 1.青粗线&#xff1a;理想TCP状态转变&#xff08;服务器视角下&#xff09; 2.虚线&#xff1a;被动TCP状态转变&#xff08;服务器视角下&#xff09; 3.细实线&#xff1a;不经常出现的TCP状态转变&#xff08;类似于边界处理&#xff09; 1.青粗线解释--》服…

基于 OV2640 的图像采集显示系统(camera_init 摄像头初始化模块)

此部分可参考IIC系列文章: (1)I2C 接口控制器理论讲解 (2)I2C接口控制设计与实现 (3)I2C连续读写实现 文章目录 前言一、OV2640 摄像头初始化模块设计思路二、OV2640 摄像头初始化模块用法介绍三、复位时序设计四、数据写入操作五、数据查找表六、完整代码展示六、仿真代码展示…

VIT 和Swin Transformer

VIT&#xff1a;https://blog.csdn.net/qq_37541097/article/details/118242600 Swin Transform&#xff1a;https://blog.csdn.net/qq_37541097/article/details/121119988 一、VIT 模型由三个模块组成&#xff1a; Linear Projection of Flattened Patches(Embedding层) Tran…

中文情感分类

本文通过ChnSentiCorp数据集介绍了文本分类任务过程&#xff0c;主要使用预训练语言模型bert-base-chinese直接在测试集上进行测试&#xff0c;也简要介绍了模型训练流程&#xff0c;不过最后没有保存训练好的模型。 一.任务和数据集介绍 1.任务 中文情感分类本质还是一个文本…