CountDownLatch与CyclicBarrier原理剖析

news2024/12/22 20:19:34

1.CountDownLatch

1.1 什么是CountDownLatch

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

1.2 CountDownLatch与join

使用join同样可以达到线程同步的效果,但是调用thread.join() 方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕。

而且如果我们使用线程池的话,就没有办法直接调用线程的join方法了。所有另一方面来说,CountDownLatch要比join更加灵活。

1.3 CountDownLatch的基本使用

public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
    
        CountDownLatch latch = new CountDownLatch(5);

        for (int i=0; i<=5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 运行");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            }).start();
        }

        System.out.println("等待子线程运行结束");
        latch.await();
        System.out.println("子线程运行结束");
    }
}

这里主线程会阻塞在latch.await(),直到CountDownLatch技术为0。

1.4 CountDownLatch原理剖析

CountDownLatch类图

请添加图片描述

从上面我们可以直到Sync继承了AQS类,CountDownLatch又持有一个成员变量Sync,所有我们可以直到CountDownLatch是基于AQS实现的。

通过构造方法我们又可以得知CountDownLatch计数器的值赋给了AQS的state变量。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

await()

当线程调用await方法以后,当前线程会被阻塞,直到下面情况之一时才会返回:

  • 当所有的线程都调用了CountDownLatchcountDown方法后,也就是计数器为0时
  • 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出异常然后返回
// CountDownLatch的await()实现
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AQS中获取共享资源可被中断的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判断当前线程是否已被中断,如果是则抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // state不为0(意味着CountDownLatch还没有减到0)
    // 则执行AQS的doAcquireSharedInterruptibly方法,让当前线程进入AQS队列
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// CountDownLatch中的Sync中tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    // 当前state如果为0则返回1否则返回-1
    return (getState() == 0) ? 1 : -1;
}

由上述代码我们可以知道线程获取资源时可以被中断,并且获取的是共享资源。

名为await的方法还有一个,不过多个参数,也就是指定时间后,调用await(long timeout, TimeUnit unit)的线程会超时而返回false,如果是正常返回的,那么返回值就为true。

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

countDown()

线程调用该方法以后,计数器的值会递减,递减后如果计数器的值为0,那么就会唤醒所有因调用await方法而阻塞的线程,否则什么都不做。

// CountDownLatch中的countDown方法
public void countDown() {
    sync.releaseShared(1);
}

// AQS中的方法
public final boolean releaseShared(int arg) {
    // 调用Sync实现的tryReleaseShared方法
    if (tryReleaseShared(arg)) {
        // AQS中释放资源的方法
        doReleaseShared();
        return true;
    }
    return false;
}


// Sync中重写的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
    // 循环进行CAS操作,将state做减一操作,失败则一直重试
    for (;;) {
        // 获得当前的state变量
        int c = getState();
        // 如果state已经等于0,那么直接返回false
        if (c == 0)
            return false;
 		// 将state-1
        int nextc = c-1;
        // CAS操作修改state,成功以后判断state是否已经为0,为0则返回true,
        // 再接下来就会调用AQS中的doReleaseShared方法释放资源
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

2.CyclicBarrier

2.1 什么是CyclicBarrier

从字面理解,CyclicBarrier就是回环屏障的意思,它可以让一组线程达到一个状态后再同时执行。

  • 回环的意思是在所有线程执行完毕以后,会重置CyclicBarrier的状态使它可以被重用
  • 屏障的意思是线程调用await方法后都会被阻塞,这个阻塞点就被称为屏障,等到所有屏障都调用了await方法后,线程们就会冲破屏障继续向下运行

2.2 CyclicBarrier的基本使用

public class CycleBarrierTest {

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        // 当计数器为0时,立即执行
        @Override
        public void run() {
            System.out.println("汇总线程:" + Thread.currentThread().getName() + " 任务合并。");
        }
    });

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 将线程A添加到线程池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("线程A:" + Thread.currentThread().getName() + "执行任务。");
                    System.out.println("线程A:到达屏障点");
                    cyclicBarrier.await();
                    System.out.println("线程A:退出屏障点");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 将线程B添加到线程池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("线程B:" + Thread.currentThread().getName() + "执行任务。");
                    System.out.println("线程B:到达屏障点");
                    cyclicBarrier.await();
                    System.out.println("线程B:退出屏障点");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 调用线程池的shutdown方法关闭线程池
        // 该方法会使线程池从RUNNING状态转变为SHUTDOWN状态
        // SHUTDOWN状态意味着:不再接收新的任务,但是会对任务队列中的任务进行处理
        executorService.shutdown();
    }
}

执行结果为:

线程A:pool-1-thread-1执行任务。
线程A:到达屏障点
线程B:pool-1-thread-2执行任务。
线程B:到达屏障点
汇总线程:pool-1-thread-2 任务合并。
线程B:退出屏障点
线程A:退出屏障点

上面的例子说明了多个线程之间是相互等待的,假如计数器值为N,那么随后调用 await 方法的 N–1 个线程都会因为到达屏障点而被阻塞,当第 N 个线程调用 await 后,计 数器值为 0 了,这时候第 N 个线程才会发出通知唤醒前面的 N–1 个线程。也就是当全部 线程都到达屏障点时才能一块继续向下执行。不过这个例子并没有体现出可重用性,不过这个其实也很好理解,就是可以反复使用,感兴趣的同学可以自己去了解一下。

2.3 CyclicBarrier原理剖析

CyclicBarrier类图

请添加图片描述

由类图可知,CyclicBarrier基于独占锁实现,本质还是基于AQS实现的。

  • Generation内部有一个变量broken,用来记录当前屏障是否被打破,因为内部使用重入锁保证了线程安全,所以该属性不需要使用volatile修饰

  • parties用来记录线程个数,意味着parties个线程调用await方法以后,才会“冲破屏障

  • count一开始等于parties,每当有线程调用await方法就会减一,当count为零就意味着所有线程到达了屏障点

使用两个变量的原因就是为了达成CyclicBarrier的复用性,当count计数为0以后,会将parties重新赋值给count,从而进行复用。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

同时通过构造函数我们也可以直到,我们可以传递一个任务,而这个任务的执行时机是当所有的线程都到达屏障点以后。

await()

当线程调用await方法被阻塞,直到满足以下条件之一时就会返回:

  • parties个线程调用了await方法,也就是所有线程都到达了屏障点
  • 其他线程调用了该线程的interrupt方法中断了该线程
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 调用dowait方法阻塞当前线程,第一个参数为false表示第二个参数不生效
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    // 指定timeout后会自动返回
    return dowait(true, unit.toNanos(timeout));
}

dowait()

该方法实现了CyclicBarrier的核心功能。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 获得锁并进行加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 如果index=0就说明所有的线程都到达了屏障点,此时开始执行初始化传递的任务barrierAction
        int index = --count;
        if (index == 0) { 
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 执行任务
                if (command != null)
                    command.run();
                ranAction = true;
                // 激活其他因调用await阻塞的线程,并且重置了CyclicBarrier
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        
        // 如果index!=0
        for (;;) {
            try {
                // 没有设置超时时间的操作
                if (!timed)
                    trip.await();
                // 设置了超时时间的操作
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

// 看到这里你应该就明白了为什么CyclicBarrier可以被复用
private void nextGeneration() {
    // 唤醒条件队列中的阻塞线程
    trip.signalAll();
    // 重置CyclicBarrier
    count = parties;
    generation = new Generation();
}

3.CountDownLatch与CyclicBarrier

  • CountDownLatch计数器不能重置,CyclicBarrier可以重置循环利用。
  • CountDownLatch是基于AQS的共享模式实现的,CyclicBarrier是基于ReentrantLockCondition实现的。
  • 两者最大的区别是,进行下一步动作的动作实施者是不一样的。这里的“动作实施者”有两种,一种是主线程(即执行main函数),另一种是执行任务的其他线程,后面叫这种线程为“其他线程”,区分于主线程。对于CountDownLatch,当计数为0的时候,下一步的动作实施者是main函数;对于CyclicBarrier,下一步动作实施者是“其他线程”。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/372284.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式算法 - Snowflake算法

Snowflake&#xff0c;雪花算法是由Twitter开源的分布式ID生成算法&#xff0c;以划分命名空间的方式将 64-bit位分割成多个部分&#xff0c;每个部分代表不同的含义。这种就是将64位划分为不同的段&#xff0c;每段代表不同的涵义&#xff0c;基本就是时间戳、机器ID和序列数。…

注意力机制详解系列(二):通道注意力机制

&#x1f468;‍&#x1f4bb;作者简介&#xff1a; 大数据专业硕士在读&#xff0c;CSDN人工智能领域博客专家&#xff0c;阿里云专家博主&#xff0c;专注大数据与人工智能知识分享。 &#x1f389;专栏推荐&#xff1a; 目前在写CV方向专栏&#xff0c;更新不限于目标检测、…

锁屏面试题百日百刷-Hive篇(三)

锁屏面试题百日百刷&#xff0c;每个工作日坚持更新面试题。锁屏面试题app、小程序现已上线&#xff0c;官网地址&#xff1a;https://www.demosoftware.cn/#/introductionPage。已收录了每日更新的面试题的所有内容&#xff0c;还包含特色的解锁屏幕复习面试题、每日编程题目邮…

函数库Rollup构建优化

本节涉及的内容源码可在vue-pro-components c7 分支[1]找到&#xff0c;欢迎 star 支持&#xff01;前言本文是基于ViteAntDesignVue打造业务组件库[2] 专栏第 8 篇文章【函数库Rollup构建优化】&#xff0c;在上一篇文章的基础上&#xff0c;聊聊在使用 Rollup 构建函数库的过…

【从零开始制作 bt 下载器】一、了解 torrent 文件

【从零开始制作 bt 下载器】一、了解 torrent 文件写作背景了解 torrent 文件认识 bencodepython 解析 torrent 文件解密 torrent 文件结尾写作背景 最先开始是朋友向我诉说使用某雷下载结果显示因为版权无法下载&#xff0c;找其他的下载器有次数限制&#xff0c;于是来询问我…

Redis学习笔记(二)Redis基础(基于5.0.5版本)

一、Redis定位与特性 Redis是一个速度非常快的非关系数据库&#xff08;non-relational database&#xff09;&#xff0c;用 Key-Value 的形式来存储数据。数据主要存储在内存中&#xff0c;所以Redis的速度非常快&#xff0c;另外Redis也可以将内存中的数据持久化到硬盘上。…

[SSD综述 1.3] SSD及固态存储技术半个世纪发展史

在我们今天看来&#xff0c;SSD已不再是个新鲜事物。这多亏了存储行业的前辈们却摸爬滚打了将近半个世纪&#xff0c;才有了SSD的繁荣&#xff0c; 可惜很多前辈都没有机会看到。所有重大的技术革新都是这样&#xff0c;需要长期的技术积累&#xff0c;一代一代的工程师们默默的…

华为OD机试用Python实现 -【狼羊过河 or 羊、狼、农夫过河】(2023-Q1 新题)

华为OD机试题 华为OD机试300题大纲狼羊过河 or 羊、狼、农夫过河题目描述输入描述输出描述说明示例一输入输出说明Python 代码实现代码实现思路华为OD机试300题大纲 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看地址…

学到贫血之-贫血模型和充血模型

学习自&#xff1a;设计模式之美 1 基于贫血模型的传统开发模式 // ControllerVO(View Object) public class UserController {private UserService userService; //通过构造函数或者IOC框架注入public UserVo getUserById(Long userId) {UserBo userBo userService.getUser…

【华为OD机试模拟题】用 C++ 实现 - 相对开音节(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 获得完美走位(2023.Q1) 文章目录 最近更新的博客使用说明相对开音节题目输入输出示例一输入输出说明示例二输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高…

【云原生】搭建k8s高可用集群—20230225

文章目录多master&#xff08;高可用&#xff09;介绍高可用集群使用技术介绍搭建高可用k8s集群步骤1. 准备环境-系统初始化2. 在所有master节点上部署keepalived3.1 安装相关包3.2 配置master节点3.3 部署haproxy错误解决3. 所有节点安装Docker/kubeadm/kubelet4. 部署Kuberne…

《痞子衡嵌入式半月刊》 第 72 期

痞子衡嵌入式半月刊&#xff1a; 第 72 期 这里分享嵌入式领域有用有趣的项目/工具以及一些热点新闻&#xff0c;农历年分二十四节气&#xff0c;希望在每个交节之日准时发布一期。 本期刊是开源项目(GitHub: JayHeng/pzh-mcu-bi-weekly)&#xff0c;欢迎提交 issue&#xff0c…

【华为OD机试模拟题】用 C++ 实现 - 求解连续数列+和最大子矩阵(2023.Q1 双倍快乐)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 获得完美走位(2023.Q1) 文章目录 最近更新的博客使用说明求解连续数列题目输入输出描述示例一输入输出Code和最大子矩阵题目输入输出示例一输入输出说明

CMU15-445 Project.0总结

在线测试 本地测试 Project #0 - C Primer 以下是Project #0的网址&#xff0c;2022FALL的Project #0本质上是实现一棵字典树&#xff0c;关于字典树的相关内容可以参考C实现字典树。 在本题中&#xff0c;为了存储对应着字符串的任意类型值&#xff0c;题目设计了一个Tri…

CV——day79 读论文:基于小目标检测的扩展特征金字塔网络

Extended Feature Pyramid Network for Small Object DetectionI. INTRODUCTIONII. RELATED WORKA. 深层物体探测器B. 跨尺度特征C. 目标检测中的超分辨率III. OUR APPROACHA. 扩展特征金字塔网络B. 特征纹理传输C. 交叉分辨蒸馏IV. EXPERIMENTSA. Experimental Settings1&…

SEATA是什么?它的四种分布式事务模式

一、SEATA是什么&#xff1f; Seata 是一款开源的分布式事务解决方案&#xff0c;致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式&#xff0c;为用户打造一站式的分布式解决方案。 在继续学习使用SEATA之前&#xff0c;对s…

电子科技大学数据库与软件工程实验五

适用于网工和物联网专业 期末考试会考 目录 一、实验目的 二、实验内容 三、实验软件 四、实验步骤及数据记录 1. 权限管理 2. 数据库备份 3. 生成 AWR 报告 五、实验结论及思考题 六、总结及心得体会 七、对本实验过程及方法、手段的改进建议 一、实验目的 1、掌握…

工作九年的机器人讲师收入如何

如下都是个人情况&#xff0c;供各位朋友参考&#xff0c;仅为个人情况&#xff0c;只代表我个人。为何写这样一篇博客&#xff1a;看了最近的热点我在一所普普通通的不知名高校工作了九年。如果问及我存款……我实在是非常非常惭愧的。真实数值如下&#xff08;组合贷&#xf…

【数据库】第八章 数据库编程

第八章 数据库编程 8.1 嵌入式SQL&#xff08;C语言版&#xff09; 被嵌入的语言&#xff08;java ,C)等被称为宿主语言&#xff0c;简称主语言 当主语言 为 C 语言的时候 ​ 语法格式为 EXEC SQL <SQL语句>当主语言为java的时候 格式为 #SQL {<SQL语句>}以下讲解…

MATLAB | 如何解决实验数据散点图重叠问题(overlap)

本期部分实验效果&#xff1a; 这期讲一下如果数据重合严重该咋办(overlap)&#xff0c;事先说明&#xff0c;本文中的绘图均使用一个几行的简单小代码进行了修饰&#xff1a; function defualtAxes axgca;hold on;box on ax.XGridon; ax.YGridon; ax.XMinorTickon; ax.YMinor…