控制并发流程,做好线程间的协调

news2024/10/6 4:03:52

一、概述

1. 什么是控制并发流程?

线程一般是由线程调度器自动控制的,但有些场景需要按照我们程序员的意愿去实现多线程之间相互配合,从而满足业务逻辑。比如:

  • 让线程A等待线程B执行完后再执行等一些相互合作的逻辑;
  • 或一系列线程等待一个线程运行完毕或发出信号之后再执行

2. 控制并发流程工具类

在这里插入图片描述

二、 CountDownLatch 倒计时门栓

倒数(向下计数、倒着计数)count为0后,那些执行了 await() 方法陷入阻塞的线程就被唤醒继续执行。就像去做过山车时,等到空余座位为0时,就会发车。

1. 主要方法介绍

  • CountDownLatch(int count):仅有这一个构造函数,参数count为需要倒数的数值。
  • await(): 调用 await() 方法的线程会被挂起,它会等待直到count值为0才继续执行。
  • countDown() :将count值减1,等到为0时,那些等待的线程会被唤起。

2. 图解

  • 在构造方法中指定倒数count值;
  • 调用await的线程Ta会被挂起;
  • 每调用countDown(),倒数count会减1;但是该线程不会被挂起,依然执行
  • 当倒数count值为0时,之前执行await的线程就被唤醒,开始执行

3、代码演示

(1)用法①:一等多

一个线程等待多个线程都发出信号后,再继续自己的工作:

/**
 *      工厂中,质检,5个工人检查,当5个人都认为通过,才认为这个质检通过
 */
public class CountDownLatchDemo1 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);//指定倒数count为5
        ExecutorService pool = Executors.newFixedThreadPool(5);//线程池创建5个线程

        //5次任务质检
        for (int i = 0; i < 5; i++) {
            int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + ":质检完毕");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();//倒计count减1
                    }
                }
            };

            pool.submit(runnable);
        }
        System.out.println("等待5个人质检完。。。。。");
        countDownLatch.await();//主线程等待倒时count=0,释放所有挂起线程,并主线进行工作
        System.out.println("质检完成");

    }
    
}

image-20230615103516182

(2) 用法②:多等一

多个线程等待某个线程发出信号后,同时开始执行

/**
 *      模拟100m跑步,5名选手都准备好了,只能裁判员一生令下,5人同时跑出
 */
public class CountDownLatchDemo2 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService pool = Executors.newFixedThreadPool(5);//线程池创建5个线程

        for (int i = 0; i < 5; i++) {
            int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("NO." + no + ":准备完毕,等待发令");
                    try {
                        countDownLatch.await();
                        System.out.println("No." + no + ":开始跑步");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            pool.submit(runnable);
        }
        //主线程,模拟裁判员
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始");
        countDownLatch.countDown();
    }
}

(3)综合用法:多等一 & 一等多

多等一:5名运动员等待裁判员打枪开跑;一等多:裁判员等5名运动员到达终点;

/**
 *      模拟100m跑步,5名选手都准备好了,只能裁判员一生令下,5人同时跑出,当所有人都到终点后,比赛结束
 */
public class CountDownLatchDemo3 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);
        ExecutorService pool = Executors.newFixedThreadPool(5);//线程池创建5个线程

        for (int i = 0; i < 5; i++) {
            int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("NO." + no + ":准备完毕,等待发令");
                    try {
                        begin.await();
                        System.out.println("No." + no + ":开始跑步");
                        Thread.sleep((long) (Math.random()*10000));
                        System.out.println("No." + no + ":到达终点");
                        
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        end.countDown();
                    }
                }
            };
            pool.submit(runnable);
        }
        //主线程,模拟裁判员
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始");
        begin.countDown();
        end.await();
        System.out.println("比赛结束!");
    }
}

4、注意点

  • 他不能重复使用,当倒数count=0,该实例就失效了;
  • 如果要再次使用,需要再实例化新的对象
  • 可以实现多等多的情况

三、Semaphore 信号量

1. 作用

对于一些重量级服务,如执行时间长、处理消耗资源大,设置一下同时并发执行任务的线程个数,从而保障服务平稳运行。

  • 用来限制或管理数量的有限资源的使用情况
  • 类似于生活中的"许可证",许可证数量有限,并且只有拿到“许可证”的线程才允许运行

2. 图解

3. 重要方法

  • new Semaphore(int permits,boolean fair):初始化Semaphore并指定许可证的数量。这里可以设置是否使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证可以分发给之前等了最长时间的线程;
  • tryAcquire() :看看现在有没有空闲的许可证,如果有的话就获取,如果没有的话也没关系,我不会陷入阻塞,我可以去做别的事,过一会再来查看许可证的空闲情况。
  • tryAcquire(timeout): 和tryAcquire() 一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,我就去做别的事”
  • acquire():获取许可证,可响应中断
  • acquireUninterruptibly():获取许可证,拒绝响应中断
  • release():释放许可证

4、代码演示

(1)一般用法:

public class SemaphoreDemo {
    static Semaphore semaphore = new Semaphore(3,true);
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(50);

        for (int i = 0; i < 100; i++) {
            pool.submit(new Task());
        }
        pool.shutdown();

    }
    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+":拿到许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+":释放许可证");
            semaphore.release();
        }
    }
}

image-20230615110440704

(2) 特殊用法

一次性获取或释放多个许可证。

public class SemaphoreDemo {

    static Semaphore semaphore = new Semaphore(5, true);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            try {
                semaphore.acquire(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
            semaphore.release(3);
        }
    }
}

什么时候需要一次性获取多个许可证:

比如 TaskA 会调用很消耗资源的 method1(),而 TaskB 调用的是不太消耗资源的 method2(),假设我们一共有5个许可证。那么我们就可以要求 TaskA 获取5个许可证才能执行,而 TaskB 只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,我们可以根据自己的需求,通过分配许可证的方式合理分配资源。

5、注意点

  • 获取&释放的许可证数量要求必须一致,否则程序运行到最后,许可证都会被占用,就会都陷入阻塞
  • 根据情况设置公平性,一般设置为true,这样可以避免线程饥饿
  • 释放和获取对线程没有要求,可以由这个线程A获取,别的线程B释放
  • 可以将它实现成一个轻量级的CountDownLatch,比如信号量Semaphore的许可证数量为1,线程A获取到,线程B执行 acquire() 再获取时,就会陷入阻塞,线程A 执行release() 之后,线程B才能执行,相当于 CountDownLatch 的唤醒

四、Condition条件对象

1. 作用

当线程1需要等待某个条件的时候,它就去执行condition.await0方法,一旦执行了await0方法,线程就会进入阻塞状态。

然后通常会有另外一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal() 方法,这时JVM就会从被阻塞的线程里,找到那些等待该condition的线程,这时线程1就会收到可执行信号,它的线程状态就会变成Runnable可执行状态

2. 图解

3、signalAll() 和 signal() 的区别

  • signalAll()唤醒所有等待的线程,signal()唤醒一个
  • signal()是公平的,会唤醒等待时间最长的线程

4、代码演示

(1)基本用法

condition一般是绑定在锁lock上面的,基本用法如下:

public class ConditionDemo1 {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condtion = lock.newCondition();

    void method1() throws InterruptedException {
        lock.lock();
        try{
            System.out.println("条件不满足,开始wait");
            condtion.await();
            System.out.println("条件满足,开始执行后续的任务");
        }finally {
            lock.unlock();
        }
    }

    void method2(){
        lock.lock();
        try{
            System.out.println("准备工作完成,开始唤醒其他线程");
            condtion.signal();
        }finally {
            lock.unlock();
        }
    }

    //主函数
    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 demo1 = new ConditionDemo1();
        //主线程创建一个线程1
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    demo1.method2();//线程1,1秒后,唤醒主线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        demo1.method1();//主线程阻塞
    }
    
}

image-20230615112138646

(2)用Condition实现生产者消费者模式

/**
 *      演示Condition实现生产者消费者模式
 */
public class ConditionDemo2 {
    private static int queueSize = 10;
    private static PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition noFull = lock.newCondition();
    private static Condition notEmpty = lock.newCondition();

    //消费者
    static class Consumer extends Thread {
        @Override
        public void run() {
            comsume();
        }

        //消费操作
        void comsume(){
            while (true){
                lock.lock();
                try {
                    while (queue.size()==0){
                        System.out.println("队列空,等待数据");
                        notEmpty.await();
                    }
                    queue.poll();
                    noFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列还剩余空间"+(queueSize-queue.size())+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //生产者
    static class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }

        //生产操作
        void produce(){
            while (true){
                lock.lock();
                try {
                    while (queue.size()==queueSize){
                        System.out.println("队列满,等待消费");
                        noFull.await();
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("给队列生成了一个数据,队列有"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //主函数
    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        Producer producer = new Producer();
        consumer.start();
        producer.start();
    }

}

5. 注意点

  • 实际上,如果说Lock可以用来代替synchronized,那么Condition就可以用来代替相对应的Obiect.wait/notify的,所以Condition在用法和性质上,几乎和Obiect.wait/notify都一样
  • await方法执行后会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁
  • 调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样

五、CyclicBarrier循环栅栏

1. 作用

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当有大量线程相互配合,分别计算不同任务,并且需要最后统汇总的时候,我们可以使用CyclicBarrier。CvclicBarrier可以构造一个集结点,每一个线程执行完毕后,都会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务
  • 就像我们在生活中聚会时,首先约定咱们3个人明天中午在学校碰面,都到齐后再一起继续接下来的安排。
  • CyclicBarrier循环栅栏是可以重复使用的,这一点和 CountDownLatch 不一样

2、代码演示

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //参数1:设置几个等待数
        //参数2:当线程数满足条件后,执行的任务
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("五个到齐,走一波");
                System.out.println(Thread.currentThread().getName());
            }
        });

        //创建10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i,cyclicBarrier)).start();
        }

    }

    static class Task implements Runnable{
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程"+id+",现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random()*10000));
                System.out.println("线程"+id+":达到集合地点,开始等待其他人到达");
                cyclicBarrier.await();//陷入等待
                System.out.println("线程"+id+":出发了!!!");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

}

每当凑齐五个之后,就出发一波,cyclicBarrier可以被重复使用。

3、CountDownLatch & CyclicBarrier 的区别

  • 作用不同

    • CountDownLatch对应的是事件,完成某个事件就可以调用countDown(),一个线程可执行多次countDown(),且该线程不会阻塞
    • CyclicBarrier对应的是线程,每个线程都执行await(),线程执行完await()会阻塞,达到指定的数量才会继续运行,
  • 可重用性不同

    • CountDownLatch:倒数count到0之后,该实例就不能再使用
    • CyclicBarrier:满足指定的数量条件后,就继续执行,且可再次使用
  • 结束后统一工作

    • CountDownLatch:结束后就只是唤醒线程继续工作
    • CyclicBarrier:可以在CyclicBarrier的构造函数中自定义 runnable 任务,结束后会执行该任务

点我扫码关注微信公众号

文章来源:控制并发流程,做好线程间的协调


个人微信:CaiBaoDeCai

微信公众号名称:Java知者

微信公众号 ID: JavaZhiZhe

谢谢关注!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/649296.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【表面缺陷检测】基于yolov5的钢轨表面缺陷检测(附代码和数据集,Windows系统)

写在前面: 首先感谢兄弟们的订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。 路虽远,行则将至;事虽难,做则必成。只要有愚公移山的志气、滴水穿石的毅力,脚踏实地,埋头苦干,积跬步以至千里,就…

python打包后报错,无法启动,电脑缺少api-ms-win-core-path-11-1-0.dll

参考&#xff1a;《运行打包python程序时报&#xff1a;无法启动此程序&#xff0c;因为计算机中丢失 api-ms-win-core-path-l1-1-0.dll 尝试重新安装该程序以解决此问题。》 原因&#xff1a;python版本较高&#xff0c;打包时的python版本是python3.10&#xff0c;而运行打包…

mdBook介绍及使用——使用 Markdown 创建你自己的博客和电子书

目录 介绍一、下载与创建项目1.下载2.初始化3.结构说明 二、编写文章与启动1.编写文章2.构建3.启动 mdbook 服务 三、其他配置 介绍 mdBook 是一个使用 Markdown 创建书籍的命令行工具。它非常适合创建产品或 API 文档、教程、课程材料或任何需要清晰、易于导航和可定制的演示…

LED开关电源里的PCB回路设计应该怎么做?

LED开关电源的研发速度在最近几年中有了明显的技术飞跃&#xff0c;新产品更新换代的速度也加快了许多。作为最后一个设计环节&#xff0c;PCB的设计也显得尤为重要&#xff0c;因为一旦在这一环节出现问题&#xff0c;那么很可能会对整个的LED开关电源系统产生较多的电磁干扰&…

界面控件DevExtreme UI组件——增强的自定义功能

在本文中&#xff0c;我们将回顾DevExtreme UI组件在v22.2版本主要更新中一系列与自定义相关的增强。 DevExtreme拥有高性能的HTML5 / JavaScript小部件集合&#xff0c;使您可以利用现代Web开发堆栈&#xff08;包括React&#xff0c;Angular&#xff0c;ASP.NET Core&#x…

6、微服务组件openfeign

1、在消费端的项目中引入openfeign依赖 首先需要确保引入了springcloud&#xff0c;因为openfeign依赖与springcloud 在消费端的pom.xml中引入openfeign&#xff0c;父项目中已经引入了springcloud了 <?xml version"1.0" encoding"UTF-8"?> <…

行云创新CloudOS助力蜂巢能源获中国信通院2023云原生应用优秀案例奖

2023 年 6 月 6 日&#xff0c;工业和信息化部主办的ICT 中国高层论坛-云原生产业高峰论坛成功举办&#xff0c;活动期间&#xff0c;中国信通院发布了“2023云原生应用优秀案例奖”获奖名单。其中&#xff0c;蜂巢能源作为中国新能源行业的代表之一&#xff0c;凭借其基于行云…

IKEA EDI项目开源介绍

近期为了帮助广大用户更好地使用 EDI 系统&#xff0c;我们根据以往的项目实施经验&#xff0c;将成熟的 EDI 项目进行开源。用户安装好知行之桥EDI系统之后&#xff0c;只需要下载我们整理好的示例代码&#xff0c;并放置在知行之桥指定的工作区中&#xff0c;即可开始使用。 …

一次 Nacos 导致的 CPU 飙高问题完整复盘

今天下午突然 出现 测试环境 cpu飙高&#xff0c;干到了 60%&#xff0c;其他项目 响应时间明显变长。。。有点吓人&#xff0c;不想背锅 项目背景 出问题的项目是 需要连接各个不同nacos 和不同的 namespace 进行对应操作的 一个项目&#xff0c;对nacos的操作都是httpClien…

【工具】Maven加强版 — mvnd的使用

【工具】Maven加强版 — mvnd的使用 下载 Releases apache/maven-mvnd (github.com) 选对应的版本 我用的Windows版 1、安装 直接解压。 然后配置环境变量&#xff1a;将 bin 目录添加到 PATH 2、测试 打开CMD终端&#xff0c;输入 mvnd -v 可以看到如下信息表示安装成…

Linux入侵检测学习笔记2

查看异常流量&#xff1a; iftop&#xff1a;动态显示网络接口流量信息&#xff1a; iftop工具是一款实时流量监控工具&#xff0c;可用于监控TCP/IP连接等&#xff0c;必须以root用户的身份运行。 安装方法&#xff1a; yum install -y epel-release yum install -y iftop…

云服务器docker方式部署JAVA微服务

党建后端java微服务部署步骤&#xff08;采用docker部署&#xff09; 开通dua 开通端口号&#xff1a;8848、6379、8000 - 8010、9848、9849 step1&#xff1a;安装必要的一些系统工具 sudo apt-get update sudo apt-get install ca-certificates curl gnupg step2&#xff1a…

Spark SQL典型案例

文章目录 一、实现任务1、准备数据文件2、创建Maven项目3、修改源程序目录4、添加依赖和设置源程序目录5、创建日志属性文件6、创建HDFS配置文件7、创建词频统计单例对象 一、实现任务 1、准备数据文件 在/home目录创建words.txt hello scala world hello spark world scala…

使用 docker 搭建 mongodb 6 单节点副本集

1、拉取 mongodb 镜像 docker pull mongo 2、启动一个 mongodb 的容器&#xff0c;通过副本集的形式运行 docker run --name mongoRs -d -p 27017:27017 mongo --replSet rs0 --name 创建容器的名称。 自定义 -d 以守护进程方式启动容器 -p 2701:27017&#xff1a;MongoD…

Dockerfile 使用介绍

我们使用 Dockerfile 定义镜像&#xff0c;依赖镜像来运行容器&#xff0c;因此 Dockerfile 是镜像和容器的关键&#xff0c;Dockerfile 可以非常容易的定义镜像内容&#xff0c;同时在我们后期的微服务实践中&#xff0c;Dockerfile 也是重点关注的内容&#xff0c;今天我们就…

Android系统的问题分析笔记(9) - Android 中的 Uri 如何使用呢 ?

问题 Android 中常用的 uri 如何使用呢 &#xff1f;&#xff08;此篇分析基础为Android 7.1.1系统源码&#xff09;&#xff0c;参看Android官方说明&#xff1a;https://developer.android.com/reference/android/net/Uri&#xff0c;代码可在此查看&#xff1a;https://git…

python 模块, 包

C# 中模块&#xff0c;就好像要using dll文件 python 中模块 就是python文件 包括类、方法、变量等 from 模块名 import 功能名 功能名() import 模块名 和 from 模块名 import * 模块名都引入了&#xff0c;但使用有所区别 import 模块名 使用 模块名.功能名 from 模块名 impo…

天天使用MySQL,你知道MySQL数据库能抗多少压力吗?附(真实案例)

今天给大家分享一个知识点&#xff0c;是关于MySQL数据库架构演进的&#xff0c;因为很多兄弟天天基于mysql做系统开发&#xff0c;但是写的系统都是那种低并发压力、小数据量的&#xff0c;所以哪怕上线了也就是这么正常跑着而已&#xff0c;但是你知道你连接的这个MySQL数据库…

关于HTTP头部的重要事项,你可能不知道的!

HTTP请求就像向服务器请求某些内容&#xff0c;而HTTP响应则是服务器的回复。就像发送一条消息并收到回复一样。 HTTP请求头部是在发出请求时包含的额外信息&#xff0c;比如你要发送的数据类型或你的身份信息。在响应头部中&#xff0c;服务器提供有关发送给你的响应的信息&am…

【大学物理实验】示波器

文章目录 选择题选择题 函数信号发生器产生的电信号调节频率和调整幅度大小的旋钮是: A. 1,2 B. 2,3 C. 3,4 D. 1,4 正确答案: D 信号输入示波器Y2通道后,示波器面板上工作方式和内触发的选择应该是: A. 工作方式选Y1,内触发选Y2 B. 工作方式选Y2,内触发选Y2 C. 工作方…