CountDownLatch、Semaphore详解——深入探究CountDownLatch、Semaphore源码

news2024/10/5 21:16:52

这篇文章将会详细介绍基于AQS实现的两个并发类CountDownLatch和Semaphore,通过深入底层源代码讲解其具体实现。

目录

CountDownLatch

 countDown()

 await()

Semaphore

 Semaphore类图

 Semaphore的应用场景

 acquire()

 tryAcquire()


CountDownLatch

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 */

上面是CountDownLatch这个API的前两行注释,一句话已经说明了这个类的作用。

一种同步辅助工具,允许一个或多个线程等待其他线程正在执行的一组操作完成。

CountDownLatch是一个计数器,通过的构造方法指定初始计数器的值,调用CountDownLatch的countDown()方法让计数器的值减少1,当计数器的值变成0时,调用它的await()方法阻塞的当前线程会被释放(其实就是从for循环中结束返回),继续执行剩余的代码。

这个类的结构很简单,就不画类图了,直接贴出代码

package java.util.concurrent;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

}

CountDownLatch是通过AQS的实现类Sync来实现这个计数器功能的,通过AQS的state属性保存计数器的值。

因为这里的计数器值state是共享的,所以重写了AQS的tryAcquireShared()和tryReleaseShared()方法。

为什么要重写这两个方法呢?

因为AQS里的几个重要的方法aquire()和release()会根据当前是独占模式还是共享模式去调用对应的tryAcquire()/tryAcquireShared()、tryRelease()/tryReleaseShared()方法。

而Java已经约定了,继承AQS应该指明state属性的语义:

- 在CountDownLatch中,state值表示计数器的值;

- 在Semaphore中,state值表示许可证的数量;

 

接下来,介绍CountDownLatch中的两个最重要的方法:

 countDown()

让计数器的值减1

    public void countDown() {
        sync.releaseShared(1);
    }

 在这个方法里调用了releaseShared()方法,因为静态内部类Sync没有重写这个方法,所以调用的是超类AbstractQueuedSynchronizer的方法。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();

            return true;
        }

        return false;
    }

这个方法里先调用tryReleaseShared()方法,因为Sync重写了这个方法,所以调用的是Sync的方法。

        protected boolean tryReleaseShared(int releases) {

            for (;;) {
                // 获取state值,如果是0,直接返回false
                int c = getState();

                if (c == 0) {
                    return false;
                }

                // 通过Unsafe工具类的CAS方法尝试修改state的值为state - 1
                int nextc = c - 1;

                if (compareAndSetState(c, nextc)) {
                    // 修改完成之后,根据修改之前的state值是否为1返回true或false
                    return nextc == 0;
                }
            }
        }

 await()

阻塞当前线程,直到state值变成0才唤醒。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

在方法内部调用了AQS的acquireSharedInterruptibly()方法,Interruptibly后缀的方法都可以被中断。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

在这个方法中,先调用Thread类的静态方法interrupted()判断当前线程是否被中断,如果当前线程中断状态为true,则清除线程的中断标记并返回true。否则返回false。如果当前线程是被中断的状态,则抛出中断异常返回。关于interrupted()方法的详细介绍,请参考文章

interrupt()、interrupted()和isInterruptd()的区别icon-default.png?t=N7T8https://blog.csdn.net/heyl163_/article/details/132138422如果当前线程是正常的状态,调用tryAcquireShared()方法,因为Sync重写了这个方法,所以调用的是Sync的tryAcquireShared()方法。这个方法就是判断当前state属性值是否为0,如果不是0就返回-1,否则返回1

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

继续回到上面的代码,当state不为0时,会执行doAcquireSharedInterruptibly()方法,注意,这里的一个死循环for会让当前线程一直阻塞在这里,直到tryAcquireShared()获取到的返回值大于0,也就是1时才退出循环,而根据上面的方法,此时state值为0。

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;

        try {
            for (;;) {
                final Node p = node.predecessor();

                if (p == head) {
                    int r = tryAcquireShared(arg);

                    if (r >= 0) {
                        setHeadAndPropagate(node, r);

                        p.next = null; // help GC
                        failed = false;

                        return;
                    }
                }

                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

总结:这里的等待是通过无条件的for循环让当前线程一直等待,直到state的值变为0才退出循环返回。所以,在这里可以替代线程的join()方法使用,这也是CountDownLatch的主要用途。

Semaphore

A counting semaphore.Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

一个计数器信号量,一个Semaphore包含一组许可证的集合,每个获取器都会在必要时阻塞,直到有许可证可用,然后获取它。每次释放都会增加一个许可,从而有可能释放阻塞的获取者。不过, Semaphore 并不使用实际的许可证对象;它只是对可用数量进行计数,并采取相应的行动。

以上是Semaphore的简单介绍,在源码的注释最开头就能看到。。

 Semaphore类图

 Semaphore的应用场景

接下来通过一个案例来介绍Semaphore适用于什么场景

在广州过节和朋友去吃火锅就知道,桌子数量是固定的,一个店就那么大,就只有x桌。去到店里基本上是要排队的,要等取号在自己的号数前面的人吃完出来才能空出来桌位,否则就要在那里一直等。

上面的场景,假如店里刚好坐满,这时候突然来了1对情侣,这时候因为店里的人还没吃完,要等最少一桌的人用完餐之后出来才能轮到这队情侣用餐。

其实这里的操作就是acquire(),店里的桌数就是Semaphore的初始许可数。

 acquire()

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

方法内部调用了以下方法,当线程被中断时,清除中断标记,抛出中断异常。否则,调用tryAcquireShared()方法尝试得到一个许可证。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

这里会根据调用构造方法指定的公平锁还是非公平锁调用FairSync或NoneFairSync的tryAcquireShared()方法。

如果是使用一个参数的构造方法实例化,则使用默认的非公平锁,否则根据参数fair来决定是否公平锁。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

以默认的NoneFairSync为例,tryAcquireShared()方法会调用其超类Sync中定义的nonfairTryAcquireShared()方法。

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;

        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

nonfairTryAcquireShared()方法中会一直尝试去扣减AQS中的state值,也就是信号量中的许可证的数量。

回到acquireSharedInterruptibly()方法,当我们尝试扣减state之后state小于0,也就是许可证数量不够了,就会执行doAcquireSharedInterruptibly()方法,这个方法在CountDownLatch里也讲了,主线程会一直在for循环里出不去,相当于阻塞在这里了,直到申请许可证成功,也就是上面的方法返回了不小于0的数。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();

        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;

        try {
            for (;;) {
                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }

                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 tryAcquire()

尝试获取许可证,如果许可证数量足够,则返回true,否则返回false。

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

好了,文章就分享到这里了,看完不要忘了点赞+收藏哦~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/952557.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Arduino程序设计(六)蜂鸣器实验

蜂鸣器实验 前言一、蜂鸣器简介1、蜂鸣器的工作原理2、有源蜂鸣器和无源蜂鸣器的区别3、蜂鸣器模块电路原理图 二、有源蜂鸣器实验有源蜂鸣器控制 三、无源蜂鸣器实验1、调节蜂鸣器输出频率2、无源蜂鸣器触发报警声3、无源蜂鸣器播放音乐 参考资料 前言 本文主要介绍两种蜂鸣器…

基于jeecg-boot的flowable流程审批时增加下一个审批人设置

更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; https://gitee.com/nbacheng/nbcio-boot 前端代码&#xff1a;https://gitee.com/nbacheng/nbcio-vue.git 在线演示&#xff08;包括H5&#xff09; &#xff1a; http://122.227.135.243:9888 因为有时…

工服穿戴检测算法 工装穿戴识别算法

工服穿戴检测算法 工装穿戴识别算法利用yolo网络模型图像识别技术&#xff0c;工服穿戴检测算法 工装穿戴识别算法可以准确地识别现场人员是否穿戴了正确的工装&#xff0c;包括工作服、安全帽等。一旦检测到未穿戴的情况&#xff0c;将立即发出警报并提示相关人员进行整改。Yo…

python中的模块和包

模块 模块就是一个Python文件&#xff0c;里面有类、函数、变量等&#xff0c;我们可以拿过来用&#xff08;导入模块去使用&#xff09; 模块的导入方式 模块在使用前需要先导入。导入的语法如下: 常用的组合形式如&#xff1a; import 模块名from 模块名 import 类、变量、…

JavaScript闭包漏洞与修补措施

请先看下面一段代码 var obj (function () {var sonObj {a: 1,b: 2}return {getObj: function (v) {return sonObj[v]}}})()可以看出,这是一段很典型的js闭包代码,可以通过obj调用get方法传一个参数,如果传的是a就可以得到闭包内的对象sonObj.a var obj (function () {var s…

企业合规改革如何进行?

企业合规是现代商业运作中的重要议题&#xff0c;合规改革是促使企业适应法规变化、规范经营行为的必经之路。本文将介绍企业合规改革的步骤和方法&#xff0c;帮助企业建立健全的合规机制&#xff0c;增强竞争力&#xff0c;并赢得社会信任。 一、评估和分析 合规改革的第一…

明厨亮灶监控实施方案 opencv

明厨亮灶监控实施方案通过pythonopencv网络模型图像识别算法&#xff0c;一旦发现现场人员没有正确佩戴厨师帽或厨师服&#xff0c;及时发现明火离岗、不戴口罩、厨房抽烟、老鼠出没以及陌生人进入后厨等问题生成告警信息并进行提示。OpenCV是一个基于Apache2.0许可&#xff08…

eureka服务注册和服务发现

文章目录 问题实现以orderservice为例orderservice服务注册orderservice服务拉取 总结 问题 我们要在orderservice中根据查询到的userId来查询user&#xff0c;将user信息封装到查询到的order中。 一个微服务&#xff0c;既可以是服务提供者&#xff0c;又可以是服务消费者&a…

六、事务-1.简介

一、简介 例&#xff1a;张三转账1000元给李四 step1&#xff1a;查询张三账户余额是否有2000元 step2&#xff1a;若张三账户余额有2000元&#xff0c;张三账户余额-1000 step3&#xff1a;李四账户余额1000 若step2执行成功&#xff0c;step3执行失败&#xff0c;此时需要…

ToBeWritten之VSOC安全运营

也许每个人出生的时候都以为这世界都是为他一个人而存在的&#xff0c;当他发现自己错的时候&#xff0c;他便开始长大 少走了弯路&#xff0c;也就错过了风景&#xff0c;无论如何&#xff0c;感谢经历 转移发布平台通知&#xff1a;将不再在CSDN博客发布新文章&#xff0c;敬…

【ES】illegal_argument_exception“,“reason“:“Result window is too large

查询ES数据返回错误&#xff1a; {"root_cause":[{"type":"illegal_argument_exception","reason":"Result window is too large, from size must be less than or equal to: [10000] but was [999999]. See the scroll api for…

直播预告|博睿学院第四季即将开讲:博睿数据资深运维团队现身说法!

博睿学院第四季开讲啦&#xff01;本季博睿学院的课程将于本周四&#xff08;8月31日&#xff09;16点正式启动。本季我们邀请到了博睿数据平台支撑中心的四位资深运维专家现身说法&#xff0c;来为我们分享一体化智能可观测平台Bonree ONE的实践干货。 他们&#xff0c;见多识…

Gradio使用介绍

与他人分享你的机器学习模型、API或数据科学工作流的最佳方式之一&#xff0c;就是创建一个交互式应用程序&#xff0c;让用户或同事可以在他们的浏览器中尝试演示,Gradio是创建提供了用非常方便的方式快速创建一个前端交互应用&#xff0c;那如何使用Gradio呢&#xff1f;因为…

element侧边栏子路由点击不高亮问题

最近自己封装侧边栏 又碰到了点击子路由不高亮的问题 <template><div class"aside"><el-scrollbar :vertical"true" class"scrollbar_left_nav"><el-menu :default-active"defaultActive" :collapse"$stor…

MySQL高级篇_13_事务基础知识_尚硅谷_宋红康

MySQL高级篇_事务基础知识 1. 数据库事务概述1.1 存储引擎支持情况1.2 基本概念1.3 事务的ACID特性原子性&#xff08;atomicity&#xff09;一致性&#xff08;consistency&#xff09;隔离性&#xff08;isolation)持久性&#xff08;durability&#xff09; 1.4 事务的状态 …

Openlayers 教程 - 获取地图中心点、范围以及缩放等级

Openlayers 教程 - 获取地图中心点、范围以及缩放等级 核心代码完整代码&#xff1a;在线示例 之前项目中需要实时获取地图中心点以及范围&#xff0c;难度不高&#xff0c;为了方便使用&#xff0c;这里记录分享一下。 本文包括核心代码、完整代码以及在线示例。 核心代码 //…

Consul原理介绍

官方文档&#xff1a;https://www.consul.io/docs Raft动画演示&#xff1a;http://thesecretlivesofdata.com/raft/ 注册中心对比 Consul特点 服务发现、健康检查、Key/Value存储、安全服务通信&#xff08;TLS证书&#xff09;、多数据中心 架构 角色 数据中心 数据中心内…

windows 中pycharm中venv无法激活

1.用管理员身份打开Windows PowerShell 2.进入项目的&#xff1a;venv\Scripts 如&#xff1a;D: (1): cd .\project\venv\Scripts\ (2): 执行命令&#xff1a; Set-ExecutionPolicy RemoteSigned (3): 选择&#xff1a;Y (4): .\activate

使用生成式 AI 和 Amazon Kendra 实现企业规模的图像字幕创建和搜索

Amazon Kendra 是一个由机器学习(ML)驱动的智能搜索服务。Amazon Kendra 重新构想了您的网站和应用程序的搜索功能,以便您的员工和客户可以轻松地找到散布在您组织内多个位置和内容存储库中的内容。 亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、…

Linux启动黑屏卡住Logo登录界面无法进入系统的终极解决方式

Linux启动黑屏 卡住Logo登录界面无法进入系统的终极解决方式 "⮫适用于ubuntu系统⮨" 起因经过方法1--甲说方法2--乙说方法3--丙说方法4--丁说 结果中途误删ubuntu的etc文件如何恢复于是&#xff0c;我重装了系统&#xff0c;没想到有意外收获&#xff08;中途还把新…