CyclicBarrier学习一

news2024/11/26 12:50:16

一、定义
CyclicBarrier 字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
CyclicBarrier 作用是让一组线程相互等待,当达到一个共同点时,所有之前等待的线程再继续执行,且 CyclicBarrier 功能可重复使用。
CyclicBarrier 可以 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会释放。

注意,被阻塞的线程 ,不是同时 被释放的。但时间间隔几乎 可以说没有。

二、使用

package com.util;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.logging.Logger;

/**
 * @author : lssffy
 * @Description : 循环屏障
 * @date : 2023/12/17 19:47
 */
public class CyclicBarrierTest {

    private static final Logger logger = Logger.getLogger("CyclicBarrierTest");

    public static void main(String[] args) {
        //屏障的线程数量:5
        CyclicBarrier cb = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            int flag = i;
            new Thread(()->{
                try {
                    //最后一个线程休眠3秒,看看所有到达屏障的线程 也是休眠3秒之后再执行
                    if(flag%5==4){
                        Thread.sleep(5000);
                    }
                    //阻塞在这里,直到到达屏障的线程数量=5才会被释放
                    cb.await();
                    System.out.println(Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    logger.info("Thread interrupted" +e.getMessage());
                }catch (BrokenBarrierException e){
                    logger.info("BrokenBarrierException" + e.getMessage());
                }
            }).start();
        }
        System.out.println("线程执行完毕");
    }
}

最终执行结果,是最后到达屏障的最先执行,其次才是之前到达屏障阻塞的所有线程,其他线程被释放的顺序,也不一定是按照await方法的调用顺序来定的
在这里插入图片描述
3、方法
构造方法
一个有参构造,一个无参构造,有参构造相对于无参构造来说 多传入了一个Runnable实例, 当一个周期的屏障被 释放后, 可以执行 Runnable实例的 run方法逻辑。

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
重要方法

/**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting,
     * or if the barrier {@linkplain #isBroken is broken} when
     * {@code await} is invoked, or while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
     * then all other waiting threads will throw
     * {@link BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.
     * If an exception occurs during the barrier action then that exception
     * will be propagated in the current thread and the barrier is placed in
     * the broken state.
     *
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first
     *         to arrive and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was
     *         broken when {@code await} was called, or the barrier
     *         action (if present) failed due to an exception
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

    /**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier, or the specified waiting time elapses.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives; or
     * <li>The specified timeout elapses; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then {@link TimeoutException}
     * is thrown. If the time is less than or equal to zero, the
     * method will not wait at all.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting,
     * or if the barrier {@linkplain #isBroken is broken} when
     * {@code await} is invoked, or while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
     * waiting, then all other waiting threads will throw {@link
     * BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.
     * If an exception occurs during the barrier action then that exception
     * will be propagated in the current thread and the barrier is placed in
     * the broken state.
     *
     * @param timeout the time to wait for the barrier
     * @param unit the time unit of the timeout parameter
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first
     *         to arrive and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws TimeoutException if the specified timeout elapses.
     *         In this case the barrier will be broken.
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was broken
     *         when {@code await} was called, or the barrier action (if
     *         present) failed due to an exception
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

循环 通过reset()方法可以进行重置

    /**
     * Resets the barrier to its initial state.  If any parties are
     * currently waiting at the barrier, they will return with a
     * {@link BrokenBarrierException}. Note that resets <em>after</em>
     * a breakage has occurred for other reasons can be complicated to
     * carry out; threads need to re-synchronize in some other way,
     * and choose one to perform the reset.  It may be preferable to
     * instead create a new barrier for subsequent use.
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

4、原理
CyclicBarrier 是利用 ReentrantLock锁 和 ReentrantLock里的条件队列 来实现 所有线程 在屏障处 阻塞的效果的。
dowait逻辑

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
        	//判断count 需要到达屏障的线程数
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                	//这个就是有参构造传进来的第二个参数Runnable实例
                    final Runnable command = barrierCommand;
                    if (command != null)
                    	//最后一个到达屏障的线程 同步调用
                        command.run();
                    ranAction = true;
                    //释放所有
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                    	// 释放,进入条件队列阻塞
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
        	// 释放锁
            lock.unlock();
        }
    }

CyclicBarrier 流程
主要是的流程:
1、获取锁 如果 count != 0 就进入阻塞;
2、进入阻塞之前,首先需要进入条件队列,然后释放锁,最后阻塞;
3、如果 count != 0 会进行一个唤醒,将所有的条件队列中的节点转换为阻塞队列;
4、被唤醒过后会进行锁的获取,如果锁获取失败,会进入 lock 的阻塞队列;
5、如果锁获取成功,进行锁的释放,以及唤醒,同步队列中的线程。

CyclicBarrier 与 CountDownLatch的区别
1、CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
2、CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
3、CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
4、CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。 5、CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 6、CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
7、CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
8、CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1319304.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue写了这么久了您是否知道:为什么data属性是一个函数而不是一个对象?

一、实例和组件定义data的区别 vue实例的时候定义data属性既可以是一个对象&#xff0c;也可以是一个函数 const app new Vue({el:"#app",// 对象格式data:{foo:"foo"},// 函数格式data(){return {foo:"foo"}} })组件中定义data属性&#xff…

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解 The Flipped Classroom4 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#x…

Zotero攻略

给大家分享一下我对于Zotero的使用。 1、下载链接 Zotero | Your personal research assistant 进入后直接下载即可 2、一些好用的插件 &#xff08;1&#xff09;Zotero Connector 下载地址&#xff1a;Zotero | Connectors 超级好用&#xff01;不用一篇一篇下PDF了&am…

geemap学习笔记028:Landsat8计算时间序列NDVI并导出

前言 本节则是以Landsat8影像数据为例&#xff0c;进行NDVI时间序列计算&#xff0c;并将得到的时间序列NDVI进行展示并导出。 1 导入库并显示地图 import ee import geemap import datetime import pandas as pd import os ee.Initialize()2 定义时间范围 # 定义日期范围 …

记录 | Visual Studio报错:const char*类型的值不能用于初始化char*类型

Visual Studio 报错&#xff1a; const char *”类型的值不能用于初始化“char *”类型的实体错误 解决办法&#xff1a; 1&#xff0c;强制类型转换&#xff0c;例如&#xff1a; char * Singer::pv[] {(char*)"other", (char*)"alto", (char*)"c…

【网络协议】网络运维管理神经-SNMP协议

文章目录 什么是SNMP&#xff1f;SNMP的组件SNMP的历史版本SNMP端口SNMP配置案例SNMP工作原理SNMP的基本工作原理SNMP的操作类型SNMP TrapsSNMP Inform SNMP的应用场景推荐阅读 什么是SNMP&#xff1f; SNMP&#xff08;Simple Network Management Protocol&#xff0c;简单网…

学习黑马vue

项目分析 项目下载地址&#xff1a;vue-admin-template-master: 学习黑马vue 项目下载后没有环境可参考我的篇文章&#xff0c;算是比较详细&#xff1a;vue安装与配置-CSDN博客 安装这两个插件可格式化代码&#xff0c;vscode这个软件是免费的&#xff0c;官网&#xff1a;…

2023年OceanBase开发者大会-核心PPT资料下载

一、峰会简介 2023年OceanBase开发者大会主要涵盖了OceanBase的最新技术进展、产品更新以及开发者工具的发布。大会发布了OceanBase 4.1版本&#xff0c;公布了两大友好工具&#xff0c;升级了文档的易用性&#xff0c;并统一了企业版和社区版的代码分支。这些举措全面呈现了O…

2017年AMC8数学竞赛中英文真题典型考题、考点分析和答案解析

从战争中学习战争最有效。前几天&#xff0c;六分成长分析了2023年、2022年、2020、2019、2018年的AMC8的典型考题、考点和详细答案解析。 今天继续为大家分享2017年的AMC8的五道典型考题&#xff0c;所有的这些试题六分成长独家制作了在线版本&#xff0c;适合各种终端和设备…

Android Compose Transition 动画

Transition 是一种动画效果&#xff0c;用于在组件的状态之间进行平滑的过渡。它可以帮助我们在状态变化时&#xff0c;以一种流畅的方式更新 UI。通过使用 Compose 的 Transition API&#xff0c;您可以在应用中创建各种各样的动画效果&#xff0c;从而增强用户体验并提高应用…

智能五子棋1

*一、项目需求* 五子棋是一种简单的黑白棋&#xff0c;历史悠久&#xff0c;起源于中国&#xff0c;后传入日本&#xff0c;在日本被称为“连珠”&#xff0c;是一种老少皆宜的益智游戏。 人工智能五子棋系统的目标用户是一切想致力于研究人机对弈算法理论的相关研究者和一切…

I2C总线(一)核心

基于linux-3.14.16 一、简介 硬件上&#xff0c;i2c总线由&#xff0c;i2c控制器、i2c总线、i2c设备组成。 驱动代码将通过设置i2c寄存器&#xff0c;从而在总线上产生数据信息&#xff0c;来和i2c设备通信&#xff08;读/写&#xff09;。 i2c核心&#xff0c;主要的功能包…

spring boot 配置多数据源 踩坑 BindingException: Invalid bound statement (not found)

在上一篇&#xff1a;《【已解决】Spring Boot多数据源的时候&#xff0c;mybatis报错提示&#xff1a;Invalid bound statement (not found)》 凯哥(凯哥Java) 已经接受了&#xff0c;在Spring Boot配置多数据源时候&#xff0c;因为自己马虎&#xff0c;导致的一个坑。下面&a…

【操作系统】实验一 熟悉LINUX环境和命令

实验名称&#xff1a; 实验一 熟悉LINUX环境和命令 实验目的&#xff1a; 1. 了解UNIX/LINUX的命令及使用格式。 2.熟悉UNIX/LINUX的常用基本命令。 3. 练习并掌握LINUX提供的vi编辑器来编译C程序 4. 学会利用gcc、gdb编译、调试C程序 实验内容&#xff1a; 熟悉UNIX/LINUX的…

YOLOv5改进 | 卷积篇 | SPD-Conv空间深度转换卷积(高效空间编码技术)

一、本文介绍 本文给大家带来的改进内容是SPD-Conv&#xff08;空间深度转换卷积&#xff09;技术。SPD-Conv是一种创新的空间编码技术&#xff0c;它通过更有效地处理图像数据来改善深度学习模型的表现。SPD-Conv的基本概念&#xff1a;它是一种将图像空间信息转换为深度信息…

Java_Lambda表达式JDK8新特性(方法引用)

一、Lambda表达式 接下来&#xff0c;我们学习一个JDK8新增的一种语法形式&#xff0c;叫做Lambda表达式。作用&#xff1a;用于简化匿名内部类代码的书写。 1.1 Lambda表达式基本使用 怎么去简化呢&#xff1f;Lamdba是有特有的格式的&#xff0c;按照下面的格式来编写Lamd…

如何用Python向图像中加入噪声

我们在做机器视觉项目的过程中&#xff0c;有的时候需要向图像中加入噪声。Pytorch本身不支持类似的功能&#xff0c;如果自己写的话&#xff0c;不但麻烦&#xff0c;而且容易出错。好在skimage支持这个功能。代码如下&#xff1a; import skimage import matplotlib.pyplot …

Web安全漏洞分析—文件包含

在当今数字化时代&#xff0c;随着Web应用程序的广泛应用&#xff0c;网络安全问题愈加凸显。其中&#xff0c;文件包含漏洞作为一种常见但危险的安全隐患&#xff0c;为恶意攻击者提供了可乘之机。在这篇博客中&#xff0c;我们将深入探讨文件包含漏洞的本质、攻击手法以及应对…

信息收集 - 域名

1、Whois查询: Whois 是一个用来查询域名是否已经被注册以及相关详细信息的数据库(如:域名所有人、域名注册商、域名注册日期和过期日期等)。通过访问 Whois 服务器,你可以查询域名的归属者联系方式和注册时间。 你可以在 域名Whois查询 - 站长之家 上进行在线查询。 2、…

python提取图片型pdf中的文字(提取pdf扫描件文字)

前言 文字型pdf提取&#xff0c;python的库一大堆&#xff0c;但是图片型pdf和pdf扫描件提取&#xff0c;还是有些难度的&#xff0c;我们需要用到OCR&#xff08;光学字符识别&#xff09;功能。 一、准备 1、安装OCR&#xff08;光学字符识别&#xff09;支持库 首先要安…