并发工具类(二):CyclicBarrier

news2024/12/27 0:38:36

1、CyclicBarrier 介绍

      从字面上看 CyclicBarrier 就是 一个循环屏障,它也是一个同步助手工具,它允许多个线程

      在执行完相应的操作后彼此等待共同到达一个屏障点。

      CyclicBarrier可以被循环使用,当屏障点值变为0之后,可以在接下来的的使用中重置屏障点

      值,而无需重新定义一个CyclicBarrier。

      Cyclic循环:所有线程释放后,屏障点的数值可以被重置

       Barrier屏障:让一个或多个线程到达一个屏障点,会被阻塞;屏障点会有一个数值,当每有一

                           个线程到达屏障点时,屏障点数值就会减1操作,并且线程阻塞在屏障点,当屏

                            障点数值变为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。
                            在释放屏障点之后,可以先执行一个任务,然后让唤醒阻塞的线程继续执行后

                            续任务。 

         CyclicBarrier是一种同步机制,允许一组线程之间互相等待,现成达到屏障点其实是基于

         await方法在屏障点阻塞;等待所有线程到达屏障点后再统一唤醒

         CyclicBarrier 并不是基于AQS来实现的,其是基于ReentrantLock锁的机制来实现对屏障点的

         “减减” 操作以及线程的挂起。

2、CyclicBarrier核心属性&构造方法

public class CyclicBarrier {
    /**
     * 内部类
     */
    private static class Generation {
        //该类用来标记是否被中断过
        //用来表示阻塞时当前party有没有被强制中断
        boolean broken = false;//某个线程由于执行了await()方法进入了阻塞状态,若该线程被执行了中断操作,那么 broken 得值就会变为true
    }

    /** 保证操作屏障值原子性的锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /**
     * 用于阻塞线程的条件变量:若有未到party的线程,那么等待该条件变量上
     * 基于当前的Condition 实现线程的挂起和唤醒
     * */
    private final Condition trip = lock.newCondition();
    /**
     * 屏障数值,与count初始值一致
     * todo 注意:不会对 parties 进行操作,因为 parties 是final修饰,初始化后不能修改
     * */
    private final int parties;//计数器得值
    /*
     * 当屏障数值count到达0时,优先执行当前任务,然后再会唤醒所有等待的线程执行后续任务
     * */
    private final Runnable barrierCommand;
    /**
     * 表示当前party是否被中断过
     * */
    private Generation generation = new Generation();

    /**
     *
     * 屏障数值,初始值与 parties 相等,当每有一个线程到达屏障点时,就会执行count--操作
     */
    private int count;

    //构造方法
    /**
     * 
     * @param parties  屏障点数值
     *        
     * @param barrierAction  当屏障点数值达到0时,优先执行该 barrierAction 任务
     *                        若barrierAction 为null,则直接执行唤醒的线程
     *        
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        //到达屏障点后优先执行的任务
        this.barrierCommand = barrierAction;
    }

    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
}

3、CyclicBarrier应用场景及示例代码

3.1、将一个任务分成若干个并行的子任务,当所有的子任务全部执行结束后,再继续执行后边的

        工作。

        从这一点上看,CyclicBarrier 功能与CountDownLatch 的功能差不多,但他们运行方式上却

        有很大区别;在 CyclicBarrier 中,每个子任务完成后,子线程调用 CyclicBarrier的await方法

        使当前子线程进入阻塞状态,直到其他所有子线都完成了任务后,他们才能退出阻塞;

        注意:这里CyclicBarrier并没有干预主线程的运行,所以主线程的 “运行/阻塞” 需要我们来

                  手动干预。所以 CyclicBarrier 更像是把“任务分片”而不是计数器,当每个分片任务

                   完成后都会阻塞在“屏障点”,

        把前边CountDwonLatch 的示例使用 CyclicBarrier 来实现,比较 CountDwonLatch 与

        CyclicBarrier 在相同场景下使用的不同,示例代码如下:

public class CylicBarrierExample1 {

    public static void main(String[] args) {

        //先获取商品编号列表
        int[] products = getProductsByCategoryID();
        //使用Stream 流,将商品编号列表中的每个商品转换为 ProductPrice
        List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());
        //定义 CyclicBarrier ,并设置子任务数
        CyclicBarrier barrier = new CyclicBarrier(list.size());
        //存放线程任务的集合
        final List<Thread> threads = new ArrayList<>();
        list.forEach(pp -> {
            //对每个商品都创建一个子任务来计算
            Thread thread = new Thread(() -> {
                System.out.println(pp.getProdID()+" -> start calculate price.");
                try {
                    //模拟业务逻辑耗时
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    if(pp.prodID %2 == 0){
                        pp.setPrice(pp.prodID*0.9D);
                    }else {
                        pp.setPrice(pp.prodID*0.71D);
                    }
                    System.out.println(pp.getProdID()+" -> price calculate");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    try {
                        //当前子任务线程进入阻塞状态,在这里等待所有的子任务线程都执行到共同的屏障点 barrier point
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
            threads.add(thread);
            thread.start();
        });
        //遍历所有的子任务线程,让主线程等待所有的子任务线程结束
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("**************************************");

        System.out.println("All of price calculate finished!");
        list.forEach(System.out::println);


    }

    //获取商品编号列表
    private static int[] getProductsByCategoryID(){
        //商品列表编号为从1,10的数字
        return IntStream.rangeClosed(1,10).toArray();
    }

    //定义商品类,有2个成员变量:商品编号和商品价格
    private static class ProductPrice{
        private final int prodID;//商品编号
        private double price;//商品价格

        public ProductPrice(int prodID){
            this(prodID,-1);
        }

        public ProductPrice(int prodID,double price){
            this.prodID = prodID;
            this.price = price;
        }

        public int getProdID(){
            return this.prodID;
        }

        public void setPrice(double price){
            this.price = price;
        }

        @Override
        public String toString() {
            return "ProductPrice{" +
                    "prodID=" + prodID +
                    ", price=" + price +
                    '}';
        }
    }
}

        上边这段代码,有个需要优化的地方,即:既然 CyclicBarrier 中所有线程都会阻塞在屏

         障点,所有任务都达到屏障点时才会往下执行,那么我们可以把主线程也作为一个任务线程

        ,即在定义 CyclicBarrier 屏障点数值时,在原有的数值上加1,然后在主线程中执行

         CyclicBarrier的await方法,这样就不用让主线程等待每个子线程执行完成了

         优化代码如下:

public static void main(String[] args) {

        //先获取商品编号列表
        int[] products = getProductsByCategoryID();
        //使用Stream 流,将商品编号列表中的每个商品转换为 ProductPrice
        List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());
        //定义 CyclicBarrier ,并设置子任务数
        CyclicBarrier barrier = new CyclicBarrier(list.size()+1);
        //存放线程任务的集合
        final List<Thread> threads = new ArrayList<>();
        list.forEach(pp -> {
            //对每个商品都创建一个子任务来计算
            Thread thread = new Thread(() -> {
                System.out.println(pp.getProdID()+" -> start calculate price.");
                try {
                    //模拟业务逻辑耗时
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    if(pp.prodID %2 == 0){
                        pp.setPrice(pp.prodID*0.9D);
                    }else {
                        pp.setPrice(pp.prodID*0.71D);
                    }
                    System.out.println(pp.getProdID()+" -> price calculate");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    try {
                        //当前子任务线程进入阻塞状态,在这里等待所有的子任务线程都执行到共同的屏障点 barrier point
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
            threads.add(thread);
            thread.start();
        });
        //主线程也阻塞在屏障点
        barrier.await();
        System.out.println("**************************************");

        System.out.println("All of price calculate finished!");
        list.forEach(System.out::println);


    }

         

3.2、CyclicBarrier 循环使用

        使用 CyclicBarrier 模拟旅游时导游清点人数的场景

         大家报团旅游时,为了安全和避免掉队的,每次登上大巴,大巴启动前,导游都会清点人数

         ;在到达一个景点后,游客下车后,导游也会重复清点人数,保证所有的人都下来了后,才

         会通知大巴师傅去停车场停车,下边写个demo简单模拟下这个场景,

/**
 * CylicBarrier 的循环使用
 */
public class CylicBarrierExample2 {

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        //定义 CyclicBarrier
        final CyclicBarrier barrier = new CyclicBarrier(11);
        //创建10个线程
        for(int i=0;i<10;i++){
            //定义游客子线程,传入游客编号和 barrier
            new Thread(new Tourist(i,barrier)).start();
        }
        //主线程也进入阻塞,等待所有游客都上车
        barrier.await();
        System.out.println("Tour Guilder: all of Tourist get on the bus");
        //主线程进入阻塞,所有游客都下车
        barrier.await();
        System.out.println("Tour Guilder: all of Tourist get OFF the bus");

    }

    //定义游客线程
    private static class Tourist implements  Runnable{
        private final int touristID;
        private final CyclicBarrier barrier;

        private Tourist(int touristID,CyclicBarrier barrier){
            this.touristID = touristID;
            this.barrier = barrier;
        }

        @Override
        public void run() {

            System.out.printf("Tourist: %d by bus\n",touristID);
            //上车耗时
            this.spendSeveralSeconds();
            //上车后等待其他同伴
            this.waitAndPrint("Tourist: %d Get on the bus, and wait other people");

            //todo 注意:所有线程到达屏障点后,最后一个到达屏障点的线程会重置CyclicBarrier
            //          所以这里不需要手动调用reset()重置方法
            //下车耗时
            this.spendSeveralSeconds();
            //下车后等待其他同步全部下车
            this.waitAndPrint("Tourist: %d Get OFF the bus, and wait other people OFF");

        }
        //模拟乘客上车耗时
        private void spendSeveralSeconds(){
            try {
                TimeUnit.SECONDS.sleep(current().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //模拟上车后等待其他同伴
        private void waitAndPrint(String msg){
            System.out.printf(msg,touristID);
            System.out.println();
            try {
                //所有线程到达屏障点后,最后一个到达屏障点的线程会重置CyclicBarrier
                barrier.await();
            } catch (InterruptedException |BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

    }
}

4、CyclicBarrier 常用方法解析

      在 CyclicBarrier 中常用方法就2个,即:await 和带超时时间的await ,但真正执行业务

      的方法其实只有 doawait 一个方法,如下图所示:

              

               

4.1、dowait(boolean timed, long nanos) 方法

         dowait 方法是CyclicBarrier 的核心方法,该方法功能是先将 CyclicBarrier 计数器count减1,

          然后判断减1后的count是否等于0,若等于0,则唤醒所有阻塞在屏障点的线程,并重置

          CyclicBarrier;若减1后的count不等于0,则当前线程被阻塞,直到被其他线程唤醒或过了

          超时时间(有超时时间的情况)

          dowait 代码如下:  

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //获取 Generation 对象的引用
            final Generation g = generation;

            //判断是否有现成中断
            if (g.broken) //表示当前party已经被中断
                throw new BrokenBarrierException();

            //有中断的线程混入其中,则干掉其他所有的线程重新开始
            if (Thread.interrupted()) {//判断当前执行线程是否被中断,若被中断则先调用 breakBarrier()方法,再抛出异常
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;//计数器减1,对屏障点数据做--操作
            if (index == 0) {  // tripped  当 count 为0 得时候,表示是最后一个线程,负责唤醒所有阻塞在条件变量上的线程,然后回调barrierCommand
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;//当前任务线程
                    //优先执行 barrierCommand 任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //生成新的 Generation,并且直接返回
                    nextGeneration();//进入下一个party,这时屏障值被重置了,等价与调用了reset()方法
                    return 0;
                } finally {
                    //如果 barrierCommand 方法发生了异常,则设置 broKen标志位
                    if (!ranAction)
                        //中断当前任务线程
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {//循环等待最后一个参与party的线程,或者被中断、等待超时
                try {
                    if (!timed) //表示调用的是非超时时间的await方法,则这里也是调用Condition的不带超时时间的await
                        trip.await();
                    else if (nanos > 0L)//表示调用的是带超时时间的await方法
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //执行到这,说明线程被中断了
                    //g == generation:查看 generation 是否被重置
                    //若 generation 没有被重置,且没有现成被中断,则调用 breakBarrier 方法执行线程中断后的操作
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {//表示 generation 已经被重置或者 有线程已经被中断,则表示本次CyclicBarrier已经作废,则中断当前线程
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                //执行到这里说明线程被唤醒了

                //查看是否因为中断唤醒,若是则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();

                //查看当前线程是否是因为 generation 重置而被唤醒(即被reset),若是则直接返回index 数值
                //或者任务正常完成也会被重置
                if (g != generation)
                    return index;

                //判断是否是因为到达超时时间被唤醒,若是则中断当前任务
                if (timed && nanos <= 0L) {
                    //中断当前任务
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }


private void nextGeneration() {
        // signal completion of last generation  唤醒屏障点阻塞中的所有线程
        trip.signalAll();
        // set up next generation  修改 count 的值使其等于构造 CyclicBarrier 时传入的parties 值
        count = parties;
        generation = new Generation();//创建新的Generation,即生成下一代party
    }

   
    private void breakBarrier() {
        generation.broken = true; //设置为中断状态
        count = parties;//将计数器设置为构建 CyclicBarrer 时传入得值,即重置屏障点数值count
        trip.signalAll();//唤醒其他所有等待的线程
    }

4.2、reset() 方法

         reset() 方法功能是重置CyclicBarrier

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //干掉当前所有的线程
            breakBarrier();   // break the current generation
            //生成下一代party
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

4.3、getNumberWaiting() 方法

         该方法功能是返回正在阻塞在屏障点的线程数

public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2107245.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qt配合halcon深度学习网络环境配置

1.开发环境qt6&#xff0c;编译器MSCV2019&#xff0c;网络是halcon的对象检测&#xff0c;halcon用20. 2.建立qt项目 3.到halcon安装目录下复制include,lib这两个文件夹到qt项目中进行引用 4.引用到halcon静态库后&#xff0c;到halcon运行目录下找到静态库对应dll文件&…

浏览器百科:网页存储篇-如何在Chrome打开localStorage窗格(五)

1.引言 在前面的章节中&#xff0c;我们详细介绍了 localStorage 的基本概念、特性及其常用方法&#xff0c;帮助开发者在网页应用中实现数据的持久化存储。为了更好地管理和调试这些存储的数据&#xff0c;了解如何打开和使用浏览器的 localStorage 窗格是非常重要的。本篇文…

js实现lua解释器,类似halcon代码编辑器一行一行解释执行

解释器 只能一行一行执行&#xff0c;不能有一行代码跨越多行&#xff0c;不支持lua的表&#xff0c;只支持for i的循环&#xff0c;支持自定义函数&#xff0c;并且可以跳到函数里面一行一行执行&#xff0c;这里的函数并不是lua的函数&#xff0c;而是由js状态控制执行函数里…

DBETX-1X/250G24-8NZ4M比例溢流阀配套HE-SP1比例放大器

0811402019|DBETX-1X/250G24-8NZ4M比例溢流阀配套HE-SP1比例放大器主要是一种电液控制技术&#xff0c;用于调节液压系统中的压力&#xff0c;通过BEUEC比例放大器电气输入信号控制阀口的开度&#xff0c;实现对系统压力的精准控制。 比例溢流阀技术的关键在于其能够将电信号转…

振动分析-26-频域分析之深入理解功率谱和功率谱密度的计算过程

1 什么是PSD(功率谱密度) 功率谱密度(Power Spectral Density),以及其与Autopower(自功率谱)的区别。 1.1 PSD的定义 PSD——Power Spectral Density是表征信号的功率能量与频率的关系的物理量。 PSD经常用来研究随机振动信号。 PSD通常根据频率分辨率做归一化。 对于振…

Qt人脸识别与分析系统

项目源码地址https://github.com/fufufu11/QT5-FacialDetection 项目概述 本项目是一款基于Qt5框架构建的人脸检测应用程序&#xff0c;支持多摄像头选择和用户友好的图形界面。系统集成了百度的人脸检测API&#xff0c;能够通过HTTPS协议POST方法安全地发送请求&#xff0c;并…

一个基于共享内存的内存数据库:2 设计

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

opencv轮廓近似,模板匹配

在图像处理领域&#xff0c;轮廓近似和模板匹配是两种非常关键的技术&#xff0c;它们广泛应用于计算机视觉、图像分析和图像识别等多个方面。本文将详细介绍如何使用OpenCV库进行轮廓近似和模板匹配&#xff0c;并给出具体的代码示例。 一、轮廓近似&#xff08;Contour Appr…

使用stripe进行在线支付、退款、订阅、取消订阅功能(uniapp+h5)

stripe官网:Stripe 登录 | 登录 Stripe 管理平台 然后在首页当中打开测试模式,使用测试的公钥跟私钥进行开发 测试卡号 4242 4242 4242 4242 1234 567 在线支付 stripe的在线支付有两种,第一种就是无代码,第二中就是使用api进行自定义,一般来说推荐第二种进行开发 无…

谁还只会用OBS?多场景录制试试这四款!

很多人在录屏的时候&#xff0c;尤其是打游戏的朋友&#xff0c;第一时间想到的都是OBS&#xff0c;其实除了这款工具&#xff0c;还有很多好用的第三方录屏工具&#xff0c;一样可以帮助我们录制出不卡顿的高清视频。今天&#xff0c;我们就来对比一下市面上四款热门的录屏软件…

echarts环形图

let dataValue[{value: 30,name: 桥梁,percent: 0.25,color: rgba(248,95,94,1),radius: [75%, 80%],center: [22%, 50%],},{value: 15,name: 隧道,percent: 0.25,color: rgba(243,185,71,1),radius: [65%, 70%],center: [22%, 50%],},{value: 18,name: 路基,percent: 0.25,col…

类似antdesign悬浮按钮上浮小动画【已验证,正常运行】

以下是基于vue2的完整代码&#xff0c;习惯用vue写了&#xff0c;如果是其他框架复制div和css就行 部分代码来自我搭建的GPT4o/Claude <template><div class"progress-container"><div class"circlenav-container"><div class"…

深度学习——引言

一、机器学习的关键因素 1.1 数据 每个数据集由一 个个样本组成&#xff0c;大多情况下&#xff0c;数据遵循独立同分布。通常每个样本由一组特征属性组成。 好的数据集 { 数据样本多 正确的数据 ( g a r b a g e i n , g a r b a g e o u t ) 好的数据集 \begin{cases} 数据…

通俗易懂理解Hive四种排序

前言 Hive的四种排序包括Sort By、Order By、Distribute By和Cluster By。有关这四种排序的区别&#xff0c;在大数据面试中可能会经常被问到&#xff0c;在我们很多人的实际应用中可能最常用的就是全局排序order by&#xff0c;因此对于其他几个排序理解并不准确&#xff0c;…

ardupilot开发 --- 炸酱面 篇

我的头可不是面头捏的 奥维互动地图ovital航点文件转Mission planner航点文件 奥维互动地图ovital航点文件转Mission planner航点文件 gcj02 转 wgs84 奥维互动地图&#xff1a;https://www.ovital.com 航线1.ovjsn 转换工具&#xff1a;https://github.com/huangyangl/geo_c…

Linux之grep命令

在文本文件中过滤&#xff0c;包含指定字符串的行 – grep [选项] 字符串 文本文件...• 常用命令选项 – -v&#xff0c;取反匹配 – -i&#xff0c;忽略大小写 ]# grep root /etc/passwd #包含root的行 ]# grep -v root /etc/passwd #不包含root ]# grep ROOT…

操作系统的功能及应用

操作系统介绍 操作系统&#xff08;Operating System, OS&#xff09;是计算机系统中不可或缺的核心软件&#xff0c;它负责管理和控制计算机硬件与软件资源&#xff0c;提供用户与计算机之间的交互界面。本文将详细探讨操作系统的功能、分类及其在现代社会中的应用。 操作系统…

通过redis-operator 来部署 Redis Cluster 集群

安装 Redis Operator 首先&#xff0c;需要安装 redis-operator。可以通过 Helm 或直接应用 YAML 文件来安装。 使用 Helm 安装&#xff1a; helm repo add ot-helm https://ot-container-kit.github.io/helm-charts/ helm install redis-operator ot-helm/redis-operator --…

2024 年的 Web3 游戏:演变、趋势和市场动态

Web3 游戏行业在经历了多年的快速发展和变革之后&#xff0c;正在2024年迎来全新的阶段。这个行业从最初的边玩边赚&#xff08;Play-to-Earn, P2E&#xff09;模式出发&#xff0c;如今正在向更为平衡的“边玩边赚”模式转型。这种转型不仅解决了早期 P2E 模式下存在的可持续性…

自动驾驶真正踏出迈向“用户”的第一步:IROS24新SOTA提出个性化的实例迁移模仿学习

导读&#xff1a; 本文针对自动驾驶规划任务&#xff0c;提出了一种基于实例的迁移模仿学习方法&#xff0c;通过预先训练的微调框架从专家域迁移专业知识&#xff0c;以解决用户域数据稀缺问题。实验结果显示&#xff0c;该方法能有效捕捉用户驾驶风格并实现具有竞争力的规划性…