Java线程间通信方式(3)

news2025/1/4 16:48:56

前文了解了线程通信方式中的CountDownLatch, Condition,ReentrantLock以及CyclicBarrier,接下来我们继续了解其他的线程间通信方式。

Phaser

Phaser是JDK1.7中引入的一种功能上和CycliBarrier和CountDownLatch相似的同步工具,相对这两者而言其用法更加灵活,同时Phaser也支持重用。

在Phaser中将需要协作完成的任务分成多个阶段,每个阶段的参与者可指定,参与者可以随时注册并参与到某个阶段或者取消参与本阶段。以选修课考试为例,说明Phaser的工作逻辑,假设现有选修课3门,政治,历史,地理,各选修人数分别为20,10,10.按Phaser实现考试逻辑如下:

  • 第一阶段考政治,总共应有9名同学参加考试,在考试开始时,8位同学开始答题,另外一位同学未到,考试中途,最后一位同学进入,开始考试,所有同学答题完成后,政治考试结束
  • 第二阶段考历史,总共9名同学参考考试,在考试结束前,3名同学弃考,则实际参与考试有6名同学,所有同学答题完成后,历史考试结束
  • 第三阶段考地理,总共9名同学参与考试,中途无意外,所有同学答题完成后,地理考试结束

至此选修课考试的三个阶段均完成,所以选修课考试这个任务结束,其中第一阶段中晚到参考考试的同学说的就是参与者可以随时注册并参与到某个阶段,第二阶段中弃考的同学说的就是参与者可以随时取消参与本阶段,当所有参与本阶段的参与者均取消,则意味着该阶段完成。

在Phaser中,针对一个阶段而言,每一个参与者都被称为一个party,可以通过构造函数指定参与者数量,也可以通过register使parties(party的总和)自增,当当前阶段的所有参与者等于parties的数量时,此时phase自增1,进入下一个阶段,回调onAdvance方法

Phaser提供的核心函数如下所示:

函数名称描述备注
register()注册一个party,使得parties+1/
bulkRegister(int parties)批量注册party,使得parties变为已有个数与传入参数之和/
arriveAndDeregister()当前任务已完成,使parties计数减1,不会形成阻塞/
arriveAndAwaitAdvance()已达到执行点,线程阻塞,等待下一阶段唤醒继续执行/
awaitAdvance(int phase)参数是一个已完成的阶段编号,通常以已完成任务的arrive或者arriveAndDeregister函数的返回值作为取值,如果传入参数的阶段编号和当前阶段编号相同,则在此处等待,如果不同或者Phaser已经是terminated状态,则立即返回/
arrive()达到当前阶段,不等待其他参与者到达/

arriveAndAwaitAdvance

以上述政治考试为例,学习Phaser基本使用

public static void main(String[] args) {
    // 创建Phaser
    Phaser phaser = new Phaser(){
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("政治考试完成");
                    break;
                case 1:
                    System.out.println("历史考试完成");
                    break;
                case 2:
                    System.out.println("地理考试完成");
                    break;
            }
            // 如果到达某一阶段,Phaser中参与者为0,则会销毁该Phaser
            return super.onAdvance(phase, registeredParties);
        }
    };
    
    IntStream.range(1,10).forEach(number->{
        phaser.register();
        Thread student= new Thread(()->{
            System.out.println("学生"+number+"arrive advance");
            // 等待其他线程,此时block
            phaser.arriveAndAwaitAdvance();
            System.out.println("学生"+number+"政治开始答题");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("学生"+number+"政治交卷");
            // 考试完成,取消计数,参与者减1
            phaser.arriveAndDeregister();
            System.out.println("Phaser is terminated :" +phaser.isTerminated());
        });
        student.start();
    });
    System.out.println("Phaser is terminated :" +phaser.isTerminated());
}

输出如下:

1-4-5-1

从上面可以看出,Phaser中通过arriveAndAwaitAdvance阻塞当前线程,当所有线程到达阻塞栅栏时,唤醒等待线程继续执行,进而达到线程间同步协作。

awaitAdvance

有时候,当Phaser 在当前阶段结束时,我们需要兜底做一些策略,比如说资源的释放,状态的检查上报等,此时就需要用到awaitAdvance,awaitAdvance接受一个阶段编号,如果当前阶段编号和传入的相等,则会进入等待状态,等到所有参与者都到达该阶段栅栏时,被唤醒。实例代码如下:

public static class ThreadA implements Runnable {
    private Phaser phaser;

    public ThreadA(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start ");


        phaser.arriveAndAwaitAdvance();

        System.out.println(Thread.currentThread().getName() + " end " );
    }
}

public static class ThreadB implements Runnable {
    private Phaser phaser;

    public ThreadB(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start " );

        phaser.arriveAndAwaitAdvance();

        System.out.println(Thread.currentThread().getName() + " end ");
    }
}

public static class ThreadC implements Runnable {
    private Phaser phaser;

    public ThreadC(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
            System.out.println(Thread.currentThread().getName() + " start ");
            System.out.println(Thread.currentThread().getName() + " phaser.getPhase()=" + phaser.getPhase());
            phaser.awaitAdvance(0);
            System.out.println(Thread.currentThread().getName() + " end ");
    }
}

public static class ThreadD implements Runnable {
    private Phaser phaser;

    public ThreadD(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " begin sleep");

            Thread.sleep(5000);

            System.out.println(Thread.currentThread().getName() + " sleep completed ");
            phaser.arriveAndAwaitAdvance();

            System.out.println(Thread.currentThread().getName() + " end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    // 声明Phaser
    Phaser phaser = new Phaser(3) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println("Phaser arrived at :"+phase);
            return super.onAdvance(phase, registeredParties);
        }
    };

    Thread t1 = new Thread(new ThreadA(phaser));
    Thread t2 = new Thread(new ThreadB(phaser));
    Thread t3 = new Thread(new ThreadC(phaser));
    Thread t4 = new Thread(new ThreadD(phaser));

    t1.setName("ThreadA");
    t2.setName("ThreadB");
    t3.setName("ThreadC");
    t4.setName("ThreadD");

    t1.start();
    t2.start();
    t3.start();
    t4.start();
}

如上代码所示,声明Phaser有三个参与者ThreadA,ThreadB,ThreadD,在三个参与者都执行到arriveAndAwaitAdvance之前,ThreadC 阻塞等待,当三个参与者都执行到arriveAndAwaitAdvance后,回调onAdvance方法,此时被阻塞的参与者被唤醒执行,之后ThreadC被唤醒继续执行,运行结果如下:

1-4-5-2

Exchanger

Exchanger用于两个线程之间的通信,无论哪个线程先调用Exchanger,都会等待另外一个线程调用时进行数据交换,示例代码如下:

private static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" sleep start");
            Thread.sleep(10000);
            System.out.println(Thread.currentThread().getName()+" sleep end");
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String aa = exchanger.exchange("data from Thread1");
            System.out.println(Thread.currentThread().getName() + "   "+aa);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread1").start();

    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String bb = exchanger.exchange("data from Thread2");
            System.out.println(Thread.currentThread().getName() + "   "+bb);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread2").start();
}

运行输出如下:

1-4-5-3

总结

结合前文,我们一共学习了种线程间通信方式,主要有:

  1. Object.wait/Object.notify/Object.notifyAll + synchronized
  2. Semaphore(信号量)
  3. CountDownLatch
  4. CyclicBarrier
  5. Condition+ReentrantLock
  6. Phaser
  7. Exchanger

大家日常开发中可灵活使用,针对各通信方式比较见下表:

通信方式应用场景是否可重用子任务异常处理备注
Object.wait/Object.notify/Object.notifyAll + synchronized大多数线程通信场景依赖开发者维护,在finally块中完成释放,避免死锁/
Semaphore(信号量)通知唤醒类线程间通信场景依赖开发者维护,在finally块中释放信号量,避免死锁/
CountDownLatch串行多线程运行场景不加处理的话,子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临/
CyclicBarrier聚合类线程通信场景不加处理的话,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出/
Condition+ReentrantLock大多数线程通信场景依赖开发者维护,在finally块中完成释放,避免死锁/
Phaser适用CountDownLatch与CyclicBarrier组合场景依赖开发者维护,在finally块中取消参与者,避免死锁/
Exchanger线程间数据交换场景依赖开发者维护,确保两个线程状态正常,并行运行/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/461395.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mapbox-gl 移动端(H5)位置共享交互

文章目录 前言逆地理编码&#xff1a;获取周边地点地理编码&#xff1a;查询位置大头针选位位置卡片 前言 分享最近写的一个小demo&#xff0c;功能类似微信小程序端的大头针位置共享功能&#xff0c;需要实现的主要功能包括位置查询、周边地点检索、位置定位等&#xff0c;数…

BUUCTF jarvisoj_level0

小白垃圾做题笔记而已&#xff0c;不建议阅读。。。 这道题感觉主要就是64位程序ebp8 题目中给出了shellcode 我们直接将返回地址覆盖就好。 在main函数中调用了vulnerable_function()函数。 vulnerable函数是一个漏洞函数&#xff1a;(存在缓溢出)&#xff0c;我们只需要将…

html-audio标签样式重写思路

搭配slider 组件 ,利用原生audio的属性和方法重写样式 写个样式.监听url变化 初始化绑定播放, 拖动进度条,拖动音量, 静音按钮等事件 const audioRef ref(null) // 绑定audio标签 const playProcess ref(0) // 进度条绑定的值 const volume ref(1) // 音量绑定的值 const …

C++ STL之vector容器

目录 一、vector容器的介绍 二、vector容器的使用 1.vector的构造函数 2.vector的赋值操作 3.vector的容量与大小 4.vector的插入和删除 5.vector的数据存取 6.vector的互换容器 7.算法模块在vector的应用 ①find算法(std) ②sort算法(std) 一、vector容器的介绍 引…

07 - 进程创建大盘点

---- 整理自狄泰软件唐佐林老师课程 查看所有文章链接&#xff1a;&#xff08;更新中&#xff09;Linux系统编程训练营 - 目录 文章目录 1. 进程创建回顾2. 再论进程创建2.1 思考2.2 vfork()深度分析2.3 vfork()要点分析2.4 fork()的现代优化2.5 编程实验&#xff1a;fork() &…

【安卓源码】Binder机制2 -- addService 流程

0、binder 进程间通信原理 一次完整的 Binder IPC 通信过程通常是这样&#xff1a; 首先 Binder 驱动在内核空间创建一个数据接收缓存区&#xff1b; 接着在内核空间开辟一块内核缓存区&#xff0c;建立内核缓存区和内核中数据接收缓存区之间的映射关系&#xff0c;以及内核中…

PHP入门【1】环境搭建

目录 一&#xff0c;安装appserv组合包 二&#xff0c;运行第一个php程序 一&#xff0c;安装appserv组合包 组合包&#xff1a;将apache&#xff0c;mysql&#xff0c;php等服务器软件和工具安装配置完成后打包处理 组合包大大提高了我们的效率&#xff0c;不需要为配置环境…

Linux服务器出现503 服务不可用错误怎么办?

​  HTTP 503 服务不可用错误代码表示网站暂时不可用。无论您是网站访问者还是管理员&#xff0c;503 页面都很麻烦。尽管该错误表明存在服务器端问题&#xff0c;但对于访问者和网络管理员来说&#xff0c;有一些可能的解决方案。本文将解释Linux服务器出现503 服务不可用错…

PowerShell Studio 2023 Crack

PowerShell Studio 2023 Crack SAPIEN Script Packager为MSI Builder添加了ARM64平台支持。 增加了对Microsoft PowerShell 7.2.11和7.3.4的支持。 WiX工具集已更新到3.14。 PowerShell Studio 2023 Crack是可用的功能最强大、功能最完整的PowerShell集成脚本环境(ISE)之一。更…

通达信VCP形态选股公式,憋了好几天才写出来

VCP形态的英文”Volatility Contraction Pattern”的缩写&#xff0c;意思是“波动收缩形态”。VCP形态是全美交易冠军马克米勒维尼的核心交易模式之一&#xff0c;在其著作《股票魔法师》中有详细介绍。 马克米勒维尼把VCP形态比喻为湿毛巾&#xff0c;拧过一次后仍含水&…

动态链接库的链接和运行

本文对动态链接库的链接和运行进行一个总结&#xff0c;为什么要分开说呢&#xff1f;因为链接通过生成可执行文件并不代表运行时能找到依赖的动态库。这与静态库是不一样的&#xff0c;因为静态库在编译完成后会库会编译到可执行程序中&#xff0c;但是动态链接库则不然&#…

最新研究:可审计的具有拜占庭鲁棒的联邦学习方案

本人新论文&#xff0c;可免费下载&#xff1a;https://download.csdn.net/download/liangyihuai/87727720 Y. Liang, Y. Li and B. -S. Shin, “Auditable Federated Learning With Byzantine Robustness,” in IEEE Transactions on Computational Social Systems, doi: 10.…

【Unity3D日常开发】Unity3D中实现UI擦除效果、刮刮卡功能

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 使用Unity3D实现UI的擦拭效果、刮刮卡功能的效果实现方式比较多…

Python基于Pytorch Transformer实现对iris鸢尾花的分类预测,分别使用CPU和GPU训练

1、鸢尾花数据iris.csv iris数据集是机器学习中一个经典的数据集&#xff0c;由英国统计学家Ronald Fisher在1936年收集整理而成。该数据集包含了3种不同品种的鸢尾花&#xff08;Iris Setosa&#xff0c;Iris Versicolour&#xff0c;Iris Virginica&#xff09;各50个样本&am…

BatchNormalization和LayerNormalization的理解、适用范围、PyTorch代码示例

文章目录 为什么要NormalizationBatchNormLayerNormtorch代码示例 学习神经网络归一化时&#xff0c;文章形形色色&#xff0c;但没找到适合小白通俗易懂且全面的。学习过后&#xff0c;特此记录。 为什么要Normalization 当输入数据量级极大或极小时&#xff0c;为保证输出数…

【算法基础】直接插入排序 + 希尔排序

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前正在学习c和算法 ✈️专栏&#xff1a;【C/C】算法 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章有啥瑕疵 希望大佬指点一二 如果文章对你有…

基于人类反馈的强化学习(RLHF)在LLM领域是如何运作的?

基于人类反馈的强化学习在LLM领域是如何运作的&#xff1f; 为什么需要强化学习RLHFPre-training modelReward ModelFine-tune with RL 参考 为什么需要强化学习 指标无法衡量。在过去的nlp任务中&#xff0c;词性标注、机器翻译、语义判别等任务是nlp任务的主力军&#xff0c…

Hytrix原理

这里写目录标题 Hytrix容错机制熔断资源隔离线程池隔离信号量隔离 服务降级请求缓存请求合并 Hystrix流程图 Hytrix容错机制 熔断 在一个统计时间窗口&#xff08;HYST rixCommandProperties.metricsRollingStatisticalWindowInMilliseconds()&#xff09;内&#xff0c;处理…

转化率双倍暴涨——客户自助服务门户

近年来&#xff0c;社交媒体的兴起使客户负责品牌对话。随着电子商务和在线帮助需求的扩大&#xff0c;公司必须满足并超越新的期望&#xff0c;以保持客户满意度。 通过SaleSmartly&#xff08;ss客服&#xff09;自动化流程功能建立客户自助服务是一种双赢的决策&#xff0c…

Ajax XHR响应

文章目录 AJAX 服务器 响应服务器响应responseText 属性responseXML 属性 AJAX 服务器 响应 服务器响应 如需获得来自服务器的响应&#xff0c;请使用 XMLHttpRequest 对象的 responseText 或 responseXML 属性。 属性描述responseText获得字符串形式的响应数据。responseXML…