2.16、生产者-消费者问题

news2024/9/22 5:31:44

系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。(注:这里的“产品”理解为某种数据)

生产者、消费者共享一个初始为空、大小为 n 的缓冲区。

只有缓冲区没满时,生产者才能把产品放入缓冲区,否则必须等待。

  • 同步关系。缓冲区满时,生产者要等待消费者取走产品

只有缓冲区不空时,消费者才能从中取出产品,否则必须等待。

  • 同步关系。缓冲区为空时,消费者要等待生产者放入产品

image-20230202165415248

缓冲区是临界资源,各进程必须互斥地访问

  • 若其他进程并发地向缓冲区放入数据,会导致之前的数据会被覆盖

如何用信号量机制(PV 操作)实现生产者、消费者进程的这些功能呢?

信号量机制可实现互斥、同步、对一类系统资源的申请和释放。

  • 设置初值为 1 1 1 的互斥信号量

  • 设置初值为 0 0 0 的同步信号量(实现 “一前一后”)

  • 设置一个信号量,初始值即为资源的数量(本质上也属于“同步问题”,若无空闲资源,则申请资源的进程需要等待别的进程释放资源后才能继续往下执行)


1、PV 操作题目分析步骤

  1. 关系分析。找出题目中描述的各个进程,分析它们之间的同步

  2. 整理思路。根据各进程的操作流程确定 PV 操作的大致顺序。

  3. 设置信号量。设置需要的信号量,并根据题目条件确定信号量初值。

    (互斥信号量初值一般为 1,同步信号量的初始值要看对应资源的初始值是多少)

生产者每次要消耗(P)一个空闲缓冲区,并生产(V)一个产品

消费者每次要消耗(P)一个产品,并且释放一个空闲缓冲区(V)

往缓冲区中放入/取走产品需要互斥

image-20230202171357580

2、能否改变相邻 P、V 操作的顺序?

image-20230202171529210

若此时缓冲区内已经放满产品,则 empty=0full=n

则生产者进程执行 ① 使 mutex 变为 0 ,再执行 ② ,由于已没有空闲缓冲区,因此生产者被阻塞。由于生产者阻塞,因此切换回消费者进程。消费者进程执行 ③ ,由于 mutex 为 0,即生产者还没释放对临界资源的“锁”,因此消费者也被阻塞。

这就造成了生产者等待消费者释放空闲缓冲区,而消费者又等待生产者释放临界区的情况,生产者和消费者循环等待被对方唤醒,出现 “死锁”。

同样的,若缓冲区中没有产品,即 full=0empty=n。按③④①的顺序执行就会发生死锁。

因此, 实现互斥的 P 操作一定要在实现同步的 P 操作之后 \color{red}实现互斥的\texttt{P}操作一定要在实现同步的\texttt{P}操作之后 实现互斥的P操作一定要在实现同步的P操作之后

V 操作不会导致进程阻塞,因此 两个V操作顺序可以交换 \color{red}两个\text{V}操作顺序可以交换 两个V操作顺序可以交换


image-20230202172147588

若放到 PV 操作之间会导致并发度降低,消费者花费更多的时间去使用产品,此时还没有释放临界资源,会导致生产者不断地等待消费者。

  • 例如:业务代码放入核心代码里

3、Java 案例

import java.util.Deque;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ProducerConsumerByLock {
    //互斥资源
    private static Deque<Integer> queue = new LinkedList<>();
    //最大容量
    private static int maxSize = 5;
    private static Lock lock = new ReentrantLock(false);
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 3; i++) {
            CompletableFuture.runAsync(new Producer());
        }
        for (int i = 0; i < 3; i++) {
            CompletableFuture.runAsync(new Consumer());
        }
        //CompletableFuture.runAsync(new Consumer()); 产生的进程默认是守护进程
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //生产者
    public static class Producer implements Runnable {
        public void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == maxSize) {
                        System.out.println("产品已满, 等待消费者进行消费, 当前生产者: " + Thread.currentThread().getName());
                        //阻塞生产者, 并释放当前线程的锁
                        try {
                            full.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    //生产产品
                    queue.offerLast(1);
                    //日志打印, 业务逻辑尽量放在锁外面 , 这里只是为了顺序打印到控制台
                    System.out.println("当前生产者生产产品 " + 1 + ", 当前生产者: " + Thread.currentThread().getName());
                    //唤醒其他所有的消费者与生产者
                    empty.signalAll();
                    full.signalAll();//并发, 让其他生产者也参与竞争
                } finally {
                    lock.unlock();
                }

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void run() {
            produce();
        }
    }

    //消费者
    public static class Consumer implements Runnable {
        //消费产品
        public void consume() {
            while (true) {
                int x;
                lock.lock();
                try {
                    while (0 == queue.size()) {
                        System.out.println("产品为空, 等待生产者进行生产, 当前消费者: " + Thread.currentThread().getName());
                        //阻塞消费者者, 并释放当前线程的锁
                        try {
                            empty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    //从缓冲区获取资源
                    x = queue.poll();
                    //日志打印, 业务逻辑尽量放在锁外面 , 这里只是为了顺序打印到控制台
                    System.out.println("消费者消费产品 " + x + ", 当前消费者: " + Thread.currentThread().getName());
                    //唤醒其他所有的消费者与生产者
                    empty.signalAll();//并发, 让其他消费者也参与竞争
                    full.signalAll();
                } finally {
                    lock.unlock();
                }

                //防止一瞬间消费完成
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void run() {
            consume();
        }
    }

}


stdout

当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-6
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-4
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13

4、总结

PV 操作题目的解题思路

  1. 关系分析。找出题目中描述的各个进程,分析它们之间的同步

  2. 整理思路。根据各进程的操作流程确定 PV 操作的大致顺序。

  3. 设置信号量。设置需要的信号量,并根据题目条件确定信号量初值。

    (互斥信号量初值一般为 1,同步信号量的初始值要看对应资源的初始值是多少)

生产者消费者问题是一个互斥、同步的综合问题。

对于初学者来说最难的是发现题目中隐含的两对同步关系。

有时候是消费者需要等待生产者生产,有时候是生产者要等待消费者消费,这是两个不同的 “一前一后问题”,因此也需要设置两个同步信号量。

image-20230202200259479

易错点:实现互斥和实现同步的两个 P 操作的先后顺序

  • 实现互斥的操作要在实现同步的操作之后,否则会产生 “死锁”

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/341766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux删除软链接

不防大家试试 unlink 命令 首先我们先来创建一个文件 #mkdir test_chk #touch test_chk/test.txt #vim test_chk/test.txt (这一步随便在这个test.txt里写点东东即可) 下面我们来创建test_chk目录 的软链接 #ln-s test_chk test_chk_ln 软链接创建好了&#xff0c;我们来…

应用安全系列之三十八:注入问题的成因以及预防原理

自从有了OWASP TOP的排名依赖&#xff0c;注入问题就一直排名前三&#xff0c;这就说明了注入问题对系统的影响是十分严重的&#xff0c;而且&#xff0c;注入问题一般比较容易被利用。注入问题产生的根本原因就是程序在接受到请求中的参数时&#xff0c;没有经过严格的验证和正…

前端工程师leetcode算法面试必备-二分搜索算法(下)索算法(下)

一、287. 寻找重复数 给定一个包含 n 1 个整数的数组 nums&#xff0c;其数字都在 1 到 n 之间&#xff08;包括 1 和 n&#xff09;&#xff0c;可知至少存在一个重复的整数。假设只有一个重复的整数&#xff0c;找出这个重复的数。 1、HashMap 在没有其它附加条件的情况下&…

如何处理“WLAN没有有效的IP配置”这一问题?

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a;暂无 &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对我最大的…

svg.js使用教程

在日常web开发过程中&#xff0c;我们会需要显示一些图形化的元素&#xff0c;使用divcss、ps图片是常见的实现方式。 但使用svg来绘制可能更加合适&#xff0c;SVG是可缩放矢量图形&#xff0c;有一些预定义的形状元素&#xff0c;可被开发者使用和操作&#xff1a; 矩形(rec…

图解LeetCode——剑指 Offer 50. 第一个只出现一次的字符

一、题目 在字符串 s 中找出第一个只出现一次的字符。如果没有&#xff0c;返回一个单空格。 s 只包含小写字母。 二、示例 2.1> 示例 1: 【输入】s "abaccdeff" 【输出】b 2.2> 示例 2: 【输入】s "" 【输出】 限制&#xff1a; 0 < s 的…

Swift(5)

目录 集合类型 数组 ​编辑 合集 合集操作 字典 Where 集合类型 Swift提供了三种主要的集合类型&#xff1a;组合&#xff0c;合集&#xff0c;字典。 数组是有序的值的集合。 合集是唯一值的无序集合。 字典是无序的键值对集合。 数组 Swift数组的类型的完整写法是…

总结如何设计一款营销低代码可视化海报平台

背景 我所在的部门负责的是活动业务&#xff0c;每天都有很多的营销活动&#xff0c;随之而来的就是大量的H5活动页面。而这些H5活动已经沉淀出了比较固定的玩法交互&#xff0c;我们开发大多数的工作也只是在复制粘贴这种大量的重复工作。 在基于此背景下我开始了低代码平台…

【手写 Vuex 源码】第十篇 - Vuex 命名空间的实现

一&#xff0c;前言 上一篇&#xff0c;主要介绍了 Vuex 响应式数据和缓存的实现&#xff0c;主要涉及以下几个点&#xff1a; Vuex 的响应式实现原理&#xff1b;响应式核心方法 resetStoreVM&#xff1b;commit 和 dispatch 的处理&#xff1b; 本篇&#xff0c;继续介绍 …

jdk-concurrentHashMap(1.8)源码学习

上文&#xff1a;jdk-HashMap(1.8)源码学习concurrentHashMap介绍concurrentHashMap是一个高性能、线程安全的HashMap&#xff0c;底层数据结构主要还是以数组链表红黑树实现与HashMap的结构是一致的。concurrentHashMap1.7和1.8的区别&#xff1f;对比名称1.71.8备注线程安全是…

PhysioNet2017分类的代码实现

PhysioNet2017数据集介绍可参考文章&#xff1a;https://wendy.blog.csdn.net/article/details/128686196。本文主要介绍利用PhysioNet2017数据集对其进行分类的代码实现。 目录一、数据集预处理二、训练2.1 导入数据集并进行数据裁剪2.2 划分训练集、验证集和测试集2.3 设置训…

C语言(C语言结构基础使用)

目录 一.结构 1.结构声明 2.初始化结构 3.访问结构成员 4.结构的初始化器 5.定义无结构标记 6.结构数组 7.嵌套结构 8.复合字面量和结构&#xff08;C99&#xff09; 9.伸缩性数组成员 10.伸缩性数组得特殊处理请求 11.匿名结构&#xff08;C11&#xff09; 12.使用结构数组得函…

RiProRiProV2主题美化顶部增加一行导航header导航通知

背景: 有些网站的背景顶部有一行罪行公告,样式不错,希望自己的网站也借鉴过来,本教程将指导如何操作,并调整成自己想要的样式。 比如网友搭的666资源站 xd素材中文网

【C语言必经之路——第11节】初阶指针(2)

五、指针的运算1、指针与整数相加减看一下下面的代码&#xff1a;#include<stdio.h> int my_strlen(char* str) {int count0;while(*str!\0){count;str;//指针加减整数}return count; } int main() {int lenmy_strlen("abcdef");printf("%d\n",len);…

OpenCV实战(10)——积分图像详解

OpenCV实战&#xff08;10&#xff09;——积分图像详解0. 前言1. 积分图像计算2. 自适应阈值2.1 固定阈值的缺陷2.2 使用自适应阈值2.3 其它自适应阈值计算方法2.4 完整代码3. 使用直方图进行视觉跟踪3.1 查找目标对象3.2 完整代码小结系列链接0. 前言 我们知道直方图是通过遍…

方法递归调用

&#x1f3e1;个人主页 &#xff1a; 守夜人st &#x1f680;系列专栏&#xff1a;Java …持续更新中敬请关注… &#x1f649;博主简介&#xff1a;软件工程专业&#xff0c;在校学生&#xff0c;写博客是为了总结回顾一些所学知识点 ✈️推荐一款模拟面试&#xff0c;刷题神器…

【C++设计模式】学习笔记(4):观察者模式 Observer

目录 简介动机(Motivation)模式定义结构(Structure)要点总结笔记结语简介 Hello! 非常感谢您阅读海轰的文章,倘若文中有错误的地方,欢迎您指出~ ଘ(੭ˊᵕˋ)੭ 昵称:海轰 标签:程序猿|C++选手|学生 简介:因C语言结识编程,随后转入计算机专业,获得过国家奖学金…

渣土车智能识别检测 yolov5

渣土车智能识别检测通过yolov5网络模型深度学习技术&#xff0c;渣土车智能识别检测对禁止渣土车通行现场画面中含有渣土车时进行自动识别监测&#xff0c;并自动抓拍告警。YOLOv5是一种单阶段目标检测算法&#xff0c;该算法在YOLOv4的基础上添加了一些新的改进思路&#xff0…

【Redis场景3】缓存穿透、击穿问题

场景问题及原因 缓存穿透&#xff1a; 原因&#xff1a;客户端请求的数据在缓存和数据库中不存在&#xff0c;这样缓存永远不会生效&#xff0c;请求全部打入数据库&#xff0c;造成数据库连接异常。 解决思路&#xff1a; 缓存空对象 对于不存在的数据也在Redis建立缓存&a…

spark01-内存数据分区数量个数原理

原始代码如下&#xff1a;val conf: SparkConf new SparkConf().setMaster("local[*]").setAppName("wordcount")val scnew SparkContext(conf)val rdd: RDD[Int] sc.makeRDD(List(1,2,3,4)//将处理的数据保存分区文件rdd.saveAsTextFile("output2&…