【JUC】循环屏障CyclicBarrier详解

news2024/11/23 7:47:03

前言

jdk中提供了许多的并发工具类,大家可能比较熟悉的有CountDownLatch,主要用来阻塞一个线程运行,直到其他线程运行完毕。而jdk还有一个功能类似并发工具类CyclicBarrier,你知道它的作用吗?和CountDownLatch有什么区别呢?

对于CountDownLatch不了解的可以参考# CountDownLatch源码硬核解析

介绍和使用

CyclicBarrier,循环屏障,用来进行线程协作,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,会触发自己运行,运行完后,屏障又会开门,所有被屏障拦截的线程又可以继续运行。所以CyclicBarrier 是可以重用的。

为了更好的理解,我们举个例子,如下图所示:

我们将屏障想成栅栏,5个线程想成5头猪。5头猪开始往前跑,直到都跑到栅栏前,栅栏开始做个自己的任务,比如看看猪多重。然后打开栅栏,猪又会继续跑,跑到下一个栅栏,就这样循环....

API介绍

构造方法

  • public CyclicBarrier(int parties): 创建parties个线程任务的循环CyclicBarrier
  • public CyclicBarrier(int parties, Runnable barrierAction): 当parties个线程到到屏障出,自己执行任务barrierAction

常用API

  • int await():线程调用 await 方法通知 CyclicBarrier 本线程已经到达屏障

基本使用

我们将上面猪猪的例子通过CyclicBarrier简单做一个实现。

@Slf4j(topic = "c.CyclicBarrierPig")
public class CyclicBarrierPig {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("主人看看哪个猪跑最快,最肥...");
        });

        // 循环跑3次
        for (int i = 0; i < 3; i++) {
            // 5条猪开始跑
            for(int j = 0; j<5; j++) {
                int finalJ = j;
                service.submit(() -> {
                   log.info("pig{} is run .....", finalJ);
                    try {
                        // 随机时间,模拟跑花费的时间
                        Thread.sleep(new Random().nextInt(5000));
                        log.info("pig{} reach barrier .....", finalJ);
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
        service.shutdown();
    }
}
复制代码

运行结果:

实现原理

我们已经明白CyclicBarrier的基本使用了,那我们看看它是如何实现的。

成员属性

  • 全局锁:利用可重入锁实现的工具类
// barrier 实现是依赖于Condition条件队列,condition 条件队列必须依赖lock才能使用
private final ReentrantLock lock = new ReentrantLock();
// 线程挂起实现使用的 condition 队列,当前代所有线程到位,这个条件队列内的线程才会被唤醒
private final Condition trip = lock.newCondition();
复制代码
  • 线程数量
// 代表多少个线程到达屏障开始触发线程任务
private final int parties;	
// 表示当前“代”还有多少个线程未到位,初始值为 parties
private int count;			
复制代码
  • 当前代中最后一个线程到位后要执行的任务
private final Runnable barrierCommand;
复制代码
  • 代, 也是用标记一次循环
// 表示 barrier 对象当前 代
private Generation generation = new Generation();
private static class Generation {
    // 表示当前“代”是否被打破,如果被打破再来到这一代的线程 就会直接抛出 BrokenException 异常
    // 且在这一代挂起的线程都会被唤醒,然后抛出 BrokerException 异常。
    boolean broken = false;
}
复制代码

构造方法

public CyclicBarrie(int parties, Runnable barrierAction) {
    // 因为小于等于 0 的 barrier 没有任何意义
    if (parties <= 0) throw new IllegalArgumentException();

    this.parties = parties;
    this.count = parties;
    // 可以为 null
    this.barrierCommand = barrierAction;
}
复制代码

成员方法

  • await():阻塞等待所有线程到位
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

// timed:表示当前调用await方法的线程是否指定了超时时长,如果 true 表示线程是响应超时的
// nanos:线程等待超时时长,单位是纳秒
private int dowait(boolean timed, long nanos) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 获取当前代
        final Generation g = generation;

        // 【如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常】
        if (g.broken)
            throw new BrokenBarrierException();
		// 如果当前线程被中断了,则打破当前代,然后当前线程抛出中断异常
        if (Thread.interrupted()) {
            // 设置当前代的状态为 broken 状态,唤醒在 trip 条件队列内的线程
            breakBarrier();
            throw new InterruptedException();
        }

        // 逻辑到这说明,当前线程中断状态是 false, 当前代的 broken 为 false(未打破状态)
        
        // 假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0
        int index = --count;
        // 条件成立说明当前线程是最后一个到达 barrier 的线程,【需要开启新代,唤醒阻塞线程】
        if (index == 0) {
            // 栅栏任务启动标记
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    // 启动触发的任务
                    command.run();
                // run()未抛出异常的话,启动标记设置为 true
                ranAction = true;
                // 开启新的一代,这里会【唤醒所有的阻塞队列】
                nextGeneration();
                // 返回 0 因为当前线程是此代最后一个到达的线程,index == 0
                return 0;
            } finally {
                // 如果 command.run() 执行抛出异常的话,会进入到这里
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 自旋,一直到条件满足、当前代被打破、线程被中断,等待超时
        for (;;) {
            try {
                // 根据是否需要超时等待选择阻塞方法
                if (!timed)
                    // 当前线程释放掉 lock,【进入到 trip 条件队列的尾部挂起自己】,等待被唤醒
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 被中断后来到这里的逻辑
                
                // 当前代没有变化并且没有被打破
                if (g == generation && !g.broken) {
                    // 打破屏障
                    breakBarrier();
                    // node 节点在【条件队列】内收到中断信号时 会抛出中断异常
                    throw ie;
                } else {
                    // 等待过程中代变化了,完成一次自我打断
                    Thread.currentThread().interrupt();
                }
            }
			// 唤醒后的线程,【判断当前代已经被打破,线程唤醒后依次抛出 BrokenBarrier 异常】
            if (g.broken)
                throw new BrokenBarrierException();

            // 当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑
            if (g != generation)
                return index;
			// 当前线程 trip 中等待超时,然后主动转移到阻塞队列
            if (timed && nanos <= 0L) {
                breakBarrier();
                // 抛出超时异常
                throw new TimeoutException();
            }
        }
    } finally {
        // 解锁
        lock.unlock();
    }
}
复制代码
  • breakBarrier():打破 Barrier 屏障
private void breakBarrier() {
    // 将代中的 broken 设置为 true,表示这一代是被打破了,再来到这一代的线程,直接抛出异常
    generation.broken = true;
    // 重置 count 为 parties
    count = parties;
    // 将在trip条件队列内挂起的线程全部唤醒,唤醒后的线程会检查当前是否是打破的,然后抛出异常
    trip.signalAll();
}
复制代码
  • nextGeneration():开启新的下一代
private void nextGeneration() {
    // 将在 trip 条件队列内挂起的线程全部唤醒
    trip.signalAll();
    // 重置 count 为 parties
    count = parties;

    // 开启新的一代,使用一个新的generation对象,表示新的一代,新的一代和上一代【没有任何关系】
    generation = new Generation();
}
复制代码

和CountDownLatch的区别

相同点

二者都能让一个或多个线程阻塞等待,都可以用在多个线程间的协调,起到线程同步的作用。

不同点

  1. CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以反复 使用。
  2. CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,所有的线程才会返回各自执行自己的工作。
  3. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  4. CountDownLatch 会阻塞主线程,CyclicBarrier 不会阻塞主线程,只会阻塞子线程。

总结

本文讲解了CyclicBarrier 的基本功能和使用,同时讲解了它大致的实现。关于CyclicBarrier 具体有什么使用场景,你可能还是比较迷惑,我再举个例子,比如CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/53774.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QA特辑|重点重点!模型开发与部署的标准答案!

11月24日&#xff0c;顶象业务安全大讲堂系列课程之《智能模型平台》正式开讲&#xff0c;顶象人工智能总监无常从从模型平台的现状与需求出发&#xff0c;带大家了解了模型平台的开发环境与部署环境&#xff0c;并且就顶象的Xintell 模型平台 为大家做了演示。 直播也吸引了不…

【LeetCode每日一题】——38.外观数列

文章目录一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【解题思路】七【题目提示】八【时间频度】九【代码实现】十【提交结果】一【题目类别】 字符串 二【题目难度】 中等 三【题目编号】 38.外观数列 四【题目描述】 给定一个正整数 n …

mybatispuls 批处理 rewriteBatchedStatements=true

mybatis-plus原生的批处理 this.saveBatch(list); 实际是一条条处理&#xff0c;特慢&#xff0c;造几万行数据得几分钟以上。 如果加上配置&#xff0c;就十几秒搞定五万行数据入库 &rewriteBatchedStatementstrue

建议收藏——等级保护备案整体流程

等级保护的流程大致为定级—备案—初测—整改—复测—监督检查&#xff0c;备案需先定级。整体备案流程是向属地公安机关提交备案资料&#xff0c;需要先线上提交备案材料。线上审核通过后&#xff0c;再线下提交备案材料。具体如下&#xff1a; 1&#xff0c;先线上提交资料审…

WPF 3D MeshGeometry3D类的Positions和TriangleIndices属性研究

MeshGeometry3D 类&#xff0c;用于生成三维形状的三角形基元&#xff1b; 类的参考在此&#xff1b; https://learn.microsoft.com/zh-cn/dotnet/api/system.windows.media.media3d.meshgeometry3d?viewwindowsdesktop-7.0 写在xaml语法里面是<MeshGeometry3D Positions.…

【Android App】低功耗蓝牙中扫描BLE设备的讲解及实战(附源码和演示 超详细)

需要源码请点赞关注收藏后评论区留言私信~~~ 一、扫描BLE设备 传统蓝牙虽然历史悠久&#xff0c;但它的缺陷也很明显&#xff0c;包括但不限于下列几点&#xff1a; &#xff08;1&#xff09;需要两部设备配对之后才能继续连接&#xff0c;而且连接速度也慢&#xff1b; &a…

数组与字符串总结

一、数组 基本概念 特点&#xff1a;顺序存储&#xff0c;每个元素大小&#xff0c;类型相同&#xff0c;元素有限 高维数组可以转化为一维数组 高维数组存放次序&#xff1a;按行优先或者按列优先 按行优先的寻址公式&#xff1a; 二维数组a[m] [n]: Loc(a[i] [j]) Loc…

Ajax axios JSON Fastjson

1、概述 AJAX (Asynchronous JavaScript And XML) &#xff1a;异步的JavaScript和XML AJAX工作流程如下: 1.1、作用 AJAX作用有以下两方面&#xff1a; 1&#xff09;与服务器进行数据交换&#xff1a;通过AJAX可以给服务器发送请求&#xff0c;服务器将数据直接响应回浏览…

算法训练Day36 贪心算法系列 - 重叠区间问题 | LeetCode435. 无重叠区间;763. 划字母区间;56.合并区间

前言&#xff1a; 算法训练系列是做《代码随想录》一刷&#xff0c;个人的学习笔记和详细的解题思路&#xff0c;总共会有60篇博客来记录&#xff0c;计划用60天的时间刷完。 内容包括了面试常见的10类题目&#xff0c;分别是&#xff1a;数组&#xff0c;链表&#xff0c;哈…

【Linux】快捷键

Ctrl C&#xff1a;终止当前命令

星环科技数据中台解决方案,助力某政府机构建设新型智慧城市

客户背景 城市&#xff0c;是人们工作生活的栖息地&#xff0c;也是展示发展成果的全景图。某政府机构不仅注重城市“中枢大脑”的建设&#xff0c;而且兼顾“神经末梢”的需求&#xff0c;既有技术进步的“面子”&#xff0c;更有民生保障的“里子”。站在新的起点上&#xff…

Linux计划任务管理

一&#xff0c;计划任务管理&#xff1a; 任务管理很宽泛&#xff0c;这里是指的计划任务管理&#xff0c;在指定的时间执行。 1&#xff0c;at命令 &#xff1a; 由atd守护进程来执行&#xff0c;atd进程会定期检查系统上的 /var/spool/at 目录&#xff0c;获取at命令写入的任…

如何建立你的财务体系?

天下人都想同时实现财务管理民主自由&#xff0c;换言之一下&#xff0c;你躺在毛里求斯的海滩上&#xff0c;吹着南风&#xff0c;晒着月亮&#xff0c;还有总收入源源不绝的流向&#xff0c;阿涅尔&#xff1f; 那么&#xff0c;怎样同时实现&#xff1f;坚信我们都知道投资…

准备大半年,面试频繁受挫,Java岗面试为何越来越难?

作为一名优秀的程序员&#xff0c;技术面试都是不可避免的一个环节&#xff0c;一般技术面试官都会通过自己的方式去考察程序员的技术功底与基础理论知识。 如果你参加过一些大厂面试&#xff0c;肯定会遇到一些这样的问题&#xff1a; 1、看你项目都用的框架&#xff0c;熟悉…

什么是链动2+1模式?链动2+1模式玩法解析

链动21模式玩法解析 模式框架&#xff1a;代理、老板 奖励机制&#xff1a;平级奖、见点奖、平级奖、帮扶基金、分红奖 商业模式玩法&#xff1a;每一个代理晋升为老板的同时&#xff0c;都需要给上级代理留下“两个原始种子用户”&#xff0c;咱们这里就俗称“感恩机制”。…

双十二有哪些数码好物值得入手、双十二必买数码好物清单

双十二马上就到了&#xff0c;相信很多小伙伴已经按耐不住想要入手了吧&#xff1f;但如果目前还没什么头绪&#xff0c;不知道买什么的话&#xff0c;现在就不妨来抄一下作业吧&#xff01;近期我整理了一份双十二数码好物清单&#xff0c;都是我从用户评价、产品亮点、折扣力…

python中base64编码

1. base64编码简介 用记事本打开exe、jpg、pdf这些文件时&#xff0c;我们都会看到一大堆乱码&#xff0c;因为二进制文件包含很多无法显示和打印的字符&#xff0c;所以&#xff0c;如果要让记事本这样的文本处理软件能处理二进制数据&#xff0c;就需要一个二进制到字符串的…

Perl与JS的对比分析(数组、哈希)

一、数组 可以对数组进行增删&#xff0c;插入。与JS不同的是这些函数都是全局的&#xff0c;JS则是挂在Array.prototype上。 1&#xff0c;对数组尾部的操作pop&#xff08;删除最后的元素&#xff09;、push&#xff08;在尾部添加&#xff09; 1 2 3 goods qw/pen penci…

[附源码]JAVA毕业设计婚纱影楼服务管理(系统+LW)

[附源码]JAVA毕业设计婚纱影楼服务管理&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术…

BluePrism里WorkQueue的几种传法和区别

WorkQueue是开发中的好帮手&#xff0c;流程间任务流转非常方便&#xff0c;基本可以取代数据库的场景。 一.循环SourceData单行传入 把Queue的Item Key搭好New Item Data本来为空的collection&#xff0c;每次循环增加一行把Item Key加入New Item Data.Item KeyNew Item Data加…