线程等待其他线程执行同步类CountDownLatch

news2024/11/16 13:47:28

文章目录

    • 前言
    • 核心原理
    • 源码解析
      • 同步源码分析
      • await源码分析
      • countDown源码分析
    • 实战演示
      • 1、创建演示代码
      • 2、创建测试用例
      • 3、测试结果演示
    • 写在最后

前言

大家都知道多线程在我们实际编码过程中运用很多,很多情况我们需要靠多线程来提升系统性能。但是有些时候我们需要阻塞一部分线程,让这部分线程等待其他线程的执行完成获取结果。比如:数据统计、等待其他任务完成、死锁检查等等。为了应对这些场景,我们JUC提供了CountDownLatch类,这个类可以帮助我们在多线程的使用中让一个或多个线程等待其他线程执行完成。

核心原理

CountDownLatch内部维护了计数器和阻塞队列,计数器缓存count值,也是有多个需要先执行的线程数目;阻塞队列缓存的是调用了await()方法的线程,表示加入阻塞队列。当CountDownLatch初始化的时候会传入业务需要先执行的线程数目count值,后续线程调用await()方法则加入阻塞队列等待其他线程执行完成。countDown()方法则是在某个线程业务完成后将计数器count值减一,表示这个线程已经执行。如果所有先行线程执行完成,最终count值等于0,此时CountDownLatch就会唤醒阻塞队列中的线程继续执行。

源码解析

同步源码分析

CountDownLatch依赖关系图
在这里插入图片描述

进入JUC下CountDownLatch类,发现其内部类Sync集成AbstractQueuedSynchronizer抽象阻塞队列:

//内部类sync集成AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

继续查看源码:

//有参数的构造方法
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}


//线程加入阻塞队列方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}


//线程执行完成计数器减少方法
public void countDown() {
    sync.releaseShared(1);
}

如上源码所示,我们发现CountDownLatch的主要方法await()、countDown()、构造方法都是调用Syn内部类中的方法。然而Sync内部类继承了AQS,所以本质上我们CountDownLatch是通过AQS来实现同步的。

AQS原理的内部通过state状态和阻塞队列实现的,当 state == 0 唤醒阻塞队列中线程,对AQS还不是很了解的同学可以参考我的博客:并发基础之抽象同步队列AQS

await源码分析

上面我们分析了CountDownLatch同步是通过AQS实现的,现在我们详细分析下CountDownLatch的核心方法await()方法。

查询await()方法源码,发现CountDownLatch提供了两个让线程加入阻塞队列的方法,一个直接中断加入阻塞,另一个带有阻塞超时时间加入阻塞队列。

/**
 * 当前线程中断执行,加入阻塞队列
ublic void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

/**
 * 当前多线程中断加入阻塞队列,加入了阻塞超时时间
 */
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

继续查看源码:

//AQS执行中断加入阻塞前验证
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//AQS执行中断加入阻塞队列
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //获取状态值
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //直接作为队列头
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //放入阻塞队列尾部
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如上源码所示,CountDownLatch的await()方法本质是调用AQS中的阻塞方法。内部先验证阻塞队里是否有线程,如果没有则作为阻塞队列头部,倘若存在阻塞线程则加入到阻塞队列尾部。

countDown源码分析

上面分析了await()方法内部是调用AQS阻塞逻辑,不出意外countDown()方法应该也是调用的AQS处理释放线程锁的方法。

我们查询countDown源码:

//countDownLatch释放共享锁
public void countDown() {
    sync.releaseShared(1);
}
//AQS释放共享锁验证
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        //释放共享锁验证通过
        doReleaseShared();
        return true;
    }
    return false;
}

//countdownlatch内部类sync 覆写tryReleaseShared 方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            //只有state == 0 时候会释放共享锁
            return nextc == 0;
    }
}

如上源码所示,countDown()方法内部调用AQS的releaseShared()释放共享锁方法。在释放锁之前会验证是否满足释放条件,此时调用的是我们sync 覆写的tryReleaseShared()方法,这个方法逻辑是获取当前state值并用CAS修改state减一。然后判断state是否等于0,不等于0则不通过释放共享锁验证,等于0则通过验证。

我们继续查释放共享锁源:

//释放共享锁具体逻辑
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒后续节点
                unparkSuccessor(h);
            }
            //用CAS重新设置state值
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
/**
 * 唤醒后续阻塞线程方法
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //唤醒线程
        LockSupport.unpark(s.thread);
}

如上源码所示,doReleaseShared()方法释放了共享锁,释放完成发现阻塞队列有阻塞线程会唤醒线程。

实战演示

上面我们讲些了CountCownLatch的同步方式,以及主要方法的源码解析。相信大家对CountDownLatch都有了一定的了解,以下我们让主线程等待5个线程执行完成后再执行实战演示。

1、创建演示代码

/**
 * CountDownLatch运用
 * @author senfel
 * @version 1.0
 * @date 2023/4/20 14:58
 */
public class CountDownDemo {

    public static void actionCountDown() throws Exception{
        //创建一个定长线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //主线程等待5个线程执行完才执行
        CountDownLatch countDownLatch = new CountDownLatch(5);
        //并发量控制在2个线程
        Semaphore semaphore = new Semaphore(2);
        //创建线程执行数据
        for(int i=0;i<5;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.err.println("线程:"+Thread.currentThread().getName()+"开始执行");
                        Thread.sleep(5000);
                        System.err.println("线程:"+Thread.currentThread().getName()+"执行完成");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                    semaphore.release();
                }
            });
        }
        countDownLatch.await();
        System.err.println("主线程执行");
    }
}

2、创建测试用例

@SpringBootTest
class DemoApplicationTests {

    /**
     * countDownLatchTest
     * @throws Exception
     */
    @Test
    public void countDownLatchTest() throws Exception{
        CountDownDemo.actionCountDown();
    }
}

3、测试结果演示

线程:pool-1-thread-1开始执行
线程:pool-1-thread-4开始执行
线程:pool-1-thread-1执行完成
线程:pool-1-thread-4执行完成
线程:pool-1-thread-3开始执行
线程:pool-1-thread-2开始执行
线程:pool-1-thread-3执行完成
线程:pool-1-thread-2执行完成
线程:pool-1-thread-5开始执行
线程:pool-1-thread-5执行完成
主线程执行

解释:
countDownLatch 保证主线程最后执行,其他子线程用semaphore 保证并发数目。

写在最后

本文我们讲述了CountDownLatch 是通过AQS抽象阻塞队列实现同步功能的,又说到了awat()方法本质是调用AQS加入阻塞队列的方法,countDown本质是调用AQS释放共享锁的方法。对于最很重要的唤醒阻塞线程逻辑,也是通过CountDownLatch 覆写tryReleaseShared()方法达到满足释放共享锁条件的,在AQS释放共享锁中如果阻塞队列存在阻塞线程会唤醒线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/439136.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flink系列-7、Flink DataSet—Sink广播变量分布式缓存累加器

版权声明&#xff1a;本文为博主原创文章&#xff0c;遵循 CC 4.0 BY-SA 版权协议&#xff0c;转载请附上原文出处链接和本声明。 大数据系列文章目录 官方网址&#xff1a;https://flink.apache.org/ 学习资料&#xff1a;https://flink-learning.org.cn/ 目录 数据输出Da…

图的简单处理(C/C++)

目录 1 存图方法 1.1 邻接矩阵 1.2 邻接表 1.3 链式前向星 2 树形DP 2.1 简介 2.2 例题1&#xff1a;公司聚会 2.3 例题2&#xff1a;士兵部署 2.4 例题3&#xff1a;强力党逗志芃 2.5 例题4&#xff1a;作物杂交&#xff08;不确定树的结构&#xff09; …

N1盒子使用U外挂HomeAssistant

目录 感谢1.准备工作2.将固件写入U盘&#xff08;32G&#xff09;3.将N1设为U盘启动4.将U盘插入到HDMI旁边的USB接口上5.网页打开HomeAssistant 感谢 HomeAssistant智能家居方案 配套教程 视频 源码 硬件 智能家庭HomeAssistant N1 4月全集成固件 小米米家涂鸦HASS nodered m…

Python图像处理【12】基于小波变换执行图像去噪

基于小波变换执行图像去噪 0. 前言1. 小波变换基础2. 小波变换去噪原理3. 使用 pywt 执行小波变换图像去噪4. 使用 scikit-image 执行小波变换图像去噪4.1 循环旋转技术4.2 改进图像去噪质量 小结系列链接 0. 前言 小波 (wavelets) 变换是表示和分析多分辨率图像的通用方法&am…

上海震坤行工业超市建设数字化采购供应链的实践

上海震坤行工业超市建设数字化采购供应链的实践 对客户而言&#xff0c;MRO工业用品采购一直存在着SKU繁杂、紧急需求多、计划性不强、库存难以管理等特点。有企业统计&#xff0c;MRO零星采购金额占其全类目采购总金额的2%&#xff0c;但是用于管理的时间精力却占到了总体的6…

[使用指南]在使用MyEclipse时如何添加 更新插件

MyEclipse v2022.1.0正式版下载 通过Eclipse市场目录或各种更新站点类型添加插件&#xff0c;可以定制你的MyEclipse IDE(或Angular IDE)。 一、从目录中添加插件 添加额外插件最简单方法是通过Eclipse Marketplace目录。 1. 要打开目录&#xff0c;请从Catalog中选择Help&…

腾讯云服务器网络收发包PPS是什么?性能介绍

什么是网络收发包PPS&#xff1f;云服务器网络收发包PPS多少合适&#xff1f;网络收发包PPS是指云服务器每秒可以处理的网络数据包数量&#xff0c;单位是PPS即packets per second每秒发包数量。云服务器吧来详细说下阿里云服务器网络收发包PPS和腾讯云网络收发包性能参数表&am…

k8s安装部署apollo配置中心

一、文章大纲 二、安装MySQL5.7 三、创建apollo-config 四、创建apollo-admin 五、创建apollo-portal 六、查看apollo各个组件服务状态 七、访问apollo 八、nginx代理配置转发#注意 一定要先启动apollo-config&#xff0c;再启动apollo-admin&#xff0c;最后启动apollo-porta…

什么才是好CDN

选择一种领先于网络和移动技术不断进步以及不断演变的威胁格局的CDN&#xff0c;将使您能够始终如一地为客户提供尽可能好的在线体验&#xff0c;同时最大限度地降低运营复杂性和管理成本。 但问题来了&#xff1a;什么才是最好的CDN&#xff1f; 这个问题的唯一答案是&#x…

HBase高手之路7—HBase之全文检索Phoneix

文章目录 HBase之全文检索Phoenix一、全文检索二、全文检索工具phoenix简介1. 简介2. 使用Phoenix是否会影响HBase性能3. 哪些公司在使用Phoenix4. 官方性能测试4.1 Phoenix对标Hive&#xff08;基于HDFS和HBase&#xff09;4.2 Phoenix对标Impala4.3 关于上述官网两张性能测试…

python基础案例题:进制转换、字符串加密的实现、猜拳游戏、多种方法计算π

目录 前言1.进制转换2.字符串加密的实现3.猜拳游戏4.多种方法计算π尾语 &#x1f49d; 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 1.进制转换 功能&#xff1a; 获取十进制整数的二进制串&#xff0c;相当于内置函数bin。 算法分析&#xff1a; 对2辗转相除&…

pyecharts从入门到精通-地图专题Map-世界地图和中国城市地图

文章目录 参考安装与查看pyecharts地图实现-Geo数据集查看读取数据生成中文国家名称国家中文和英文名称字典:根据字典&#xff0c;生成国家中文名查看没有转换成功的国家中文有哪些过滤包含中文国家名的数据 可视化人口数据显示中国城市地图数据 拓展-pyecharts中Map源码拓展-p…

开放耳机有什么优缺点,推荐几款不错的开放式耳机

​由于骨传导耳机可以保持耳朵的开放&#xff0c;长时间佩戴不会有闷热感&#xff0c;同时可以在运动中保持安全&#xff0c;因此被越来越多的人接受。在目前市面上骨传导耳机品牌众多&#xff0c;价格从几十元到上千元不等&#xff0c;为了让大家更好地挑选适合自己的骨传导耳…

java获取两个日期之间的所有日期

1、获取日期的过程&#xff1a; 首先创建一个数组&#xff0c;然后在数组的末尾加上一个日期。当我们有两个日期时&#xff0c;可以把这两个日期当做是不同的数组&#xff0c;然后使用 next函数把这两个日期之间的所有时间都获取出来。 2、代码演示&#xff1a; 3、实现效果&am…

bat批处理文件无法执行

执行后弹出cmd窗口&#xff0c;但里面命令未执行 方案一&#xff1a; 1、打开开始菜单——控制面板 2、点击系统和安全——系统 3、点击左上角的“高级系统设置” 4、切换到“高级”选项卡&#xff0c;点击下方的“环境变量” 5、在用户变量下方点击“新建”&#xff0c;…

3.2.3队列的链式实现

队列的链式实现 注意声明队头指针和队尾指针作用 (1)插入节点的时候只要rear指针指向结点的next指针指向的位置&#xff1b; (2)删除只要front指指针指向的头节点next指针指向的位置删除&#xff1b; &#xff08;1&#xff09;初始化&#xff08;带头结点&#xff09; 初试化…

C++:多态的底层实现原理 -- 虚函数表

目录 一. 多态的原理 1.1 虚函数表 1.2 多态的实现原理 1.3 动态绑定与静态绑定 二. 多继承中的虚函数表 2.1 虚函数表的打印 2.2 多继承中虚函数表中的内容存储情况 一. 多态的原理 1.1 虚函数表 对于一个含有虚函数的的类&#xff0c;在实例化出来对象以后&#xff0…

Docker部署SpringBoot+Vue项目

1.项目部署规划 1.后端多模块项目blog以及各模块运行端口&#xff1a; 前台服务模块sangeng-blog->7777&#xff0c;后台服务模块sangeng-admin->8989&#xff0c;公共模块sangeng-framework 2.前端前台Vue项目&#xff1a;sg-blog-vue->80 3.前端后台Vue项目&#x…

如何在AWS EKS上部署安装nginx ingress controller

Ingress Controller Ingress Controller 通常是一个负载均衡器&#xff0c;用于将外部流量路由到您的 Kubernetes 集群&#xff0c;并负责 L4-L7 网络服务 Ingress controller 仅覆盖 L7 流量&#xff0c;而入口重新路由 HTTP 和 HTTPS 流量 Type of Ingress Controllers C…

QT编程集成环境在Ubuntu中如何使用ROS工程?

文章目录 0.引言1.安装Qt Creator&#xff08;带ROS插件&#xff09;2.创建ROS工程3.创建功能包4.创建节点5.添加编译规则6.编译运行 0.引言 在进行ROS开发过程中&#xff0c;会创建许多功能包和源代码文件&#xff0c;这些文件少量时&#xff0c;手动管理还能接受&#xff0c;…