Linux之生产者消费者模型(上)——单生产者单消费者

news2024/10/6 6:45:49

文章目录

  • 前言
  • 一、生产者消费者模型
    • 1.生产消费
    • 2.生产消费关系
      • 321原则
      • 生产消费模型的特点
  • 二、基于阻塞队列(blockqueue)的生产消费模型
    • 1.概念
    • 2.单生产单消费模型
      • 代码
      • 运行
      • 分析
      • 两种情况导致的现象
        • 生产者生产的慢,消费者消费的快
        • 生产者生产的快,消费者消费的慢
  • 三、实例——计算任务Task
  • 总结


前言

本文介绍了生产者消费者模型的概念以及单生产单消费的例子(含代码和运行结果分析)。


一、生产者消费者模型

1.生产消费

引入:
举个例子,我们想买个生活用品,但是没有交易场所的话,我们就只能直接去供货商那里去买。我们每人每次买一两件,对于供货商来说,为了这一两件商品去开启厂子里的机器进行生产,是很亏本的事情。因此,有了交易场所——超市等存在,它们作为交易商品的媒介,工作就是集中需求,分发产品。
消费者和生产者之间通过超市进行交易。当消费者没有消费的同时,生产者也可以继续生产;当消费者过来消费的同时,生产者也可以停止生产(例子:周内生产者上班生产商品,学生上学不来超市购买商品;周末生产者放假休息,不进行生产工作,学生过来超市购买商品)。由此,生产和消费这两件事就可以解耦了,我们把临时保存产品的场所称为缓冲区
在这里插入图片描述

2.生产消费关系

首先,生产和消费都要看到同一块资源——“超市”,因此“超市”必须是一个共享资源。既然是共享资源,又被两个线程(生产和消费)并发访问,那么该共享资源需要被保护起来。

321原则

  1. 三种关系:生产者和消费者互斥,消费者和消费者互斥,生产者和消费者同步。互斥是为了保证共享资源的安全性,同步是为了提高访问效率。
  2. 两种角色:生产者线程,消费者线程;
  3. 一个场所:一段特定结果的缓冲区。

想写生产消费模型,本质就是维护321原则。

生产消费模型的特点

  1. 生产线程和消费线程要进行解耦;
  2. 支持生产和消费可能有一段时间的忙闲不均问题(因此,缓冲区要有足够的空间,提前预存数据);
  3. 生产者专注生产,消费者专注消费(互相不影响),从而提高效率。
  4. 特殊的,“超市”缓冲区满了,生产者线程只能进行阻塞(等待),等待消费者消费数据;“超市”缓冲区空了,消费者线程只能进行阻塞(等待),等待生产者生产数据。

二、基于阻塞队列(blockqueue)的生产消费模型

1.概念

阻塞队列:blockqueue,是一种常用于实现生产者和消费者模型的数据结构。
阻塞队列为空时,从阻塞队列中获取元素的线程将被阻塞,直到阻塞队列被放入元素;
阻塞队列已满时,往阻塞队列中放置元素的线程将被阻塞,直到阻塞队列有元素被取出。
在这里插入图片描述

2.单生产单消费模型

代码

本例子让生产者线程生产随机数,消费者消费生产出的数字。
文件BlockQueue.hpp

  1 #include<iostream>
  2 using namespace std;
  3 #include<pthread.h>
  4 #include<time.h>
  5 #include<queue>
  6 const int gmaxcap = 5;
  7 template<class T>
  8 class BlockQueue
  9 {
 10 public:
 11         BlockQueue(const int& maxcap = gmaxcap)
 12         :_maxcap(maxcap)
 13         {
 14                 pthread_mutex_init(&_mutex, nullptr);
 15                 pthread_cond_init(&_pcond, nullptr);
 16                 pthread_cond_init(&_ccond, nullptr);
 17         }
 18         void push(const T& in)
 19         {
 20                 pthread_mutex_lock(&_mutex);
 21                 while(is_full())//队列已满
 22                 {
 23                         pthread_cond_wait(&_pcond, &_mutex);//生产者线程被阻塞
 24                 }
 25                 _q.push(in);
 26                 pthread_cond_signal(&_ccond);//唤醒消费者线程;它可以放在临界区内部被锁保护,也可以放在临界区外部
 27                 pthread_mutex_unlock(&_mutex);
 28         }
 29         void pop(T& out)//out是输出型参数,
 30         {
 31                 pthread_mutex_lock(&_mutex);
 32                 while(is_empty())//队列已空
 33                 {
 34                         pthread_cond_wait(&_ccond, &_mutex);//消费者线程被阻塞
 35                 }
 36                 out = _q.front();
 37                 _q.pop();
 38                 pthread_cond_signal(&_pcond);//唤醒生产者线程;它可以放在临界区内部被锁保护,也可以放在临界区外部
 39                 pthread_mutex_unlock(&_mutex);
 40         }
 41         ~BlockQueue()
 42         {
 43                 pthread_mutex_destroy(&_mutex);
 44                 pthread_cond_destroy(&_pcond);
 45                 pthread_cond_destroy(&_ccond);
 46         }
 47 private:
 48         bool is_empty(){return _q.empty();}
 49         bool is_full(){return _q.size() == _maxcap;}
 50         queue<int> _q;
 51         int _maxcap;//队列中元素的上限
 52         pthread_mutex_t  _mutex;
 53         pthread_cond_t _pcond;//生产者条件变量
 54         pthread_cond_t _ccond;//消费者条件变量
 55 };

文件main.cc

  1 #include"BlockQueue.hpp"
  2 #include<unistd.h>
  3 void* consumer(void* args)
  4 {
  5         BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
  6         while(1)
  7         {
  8                 int data = 0;
  9                 bq -> pop(data);
 10                 cout<<"消费数据:"<<data<<endl;
 11                 sleep(1);
 12         }
 13         return nullptr;
 14 }
 15 void* productor(void* args)
 16 {
 17         BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
 18         while(1)
 19         {
 20                 int data = rand()%10 + 1;
 21                 bq -> push(data);
 22                 cout<<"生产数据:"<<data<<endl;
 23         }
 24         return nullptr;
 25 }
 26 int main()
 27 {
 28         srand((unsigned int)time(nullptr));
 29         BlockQueue<int>* bq = new BlockQueue<int>();
 30         pthread_t con, pro;
 31         pthread_create(&con, nullptr, consumer, bq);
 32         pthread_create(&pro, nullptr, productor, bq);
 33         pthread_join(con, nullptr);
 34         pthread_join(pro, nullptr);
 35         return 0;
 36 }

运行

在这里插入图片描述

分析

当队列满了以后,生产者就需要进行等待,如果像未满时的那样将锁拿走,那么其它线程就无法访问共享资源了。
因此,pthread_cond_wait函数的第二个参数,是我们正在使用的互斥锁。
pthread_cond_wait函数,以原子性的方式将锁释放,并且把调用自己的线程挂起。同时,当挂起的线程被唤醒时会自动重新获取传入的锁。
pthread_cond_signal:唤醒线程,但是一次只会唤醒一个线程。单生产单消费用signal就可以(生产和消费的都只有一个线程)。
pthread_cond_broadcast:唤醒线程,一次唤醒一批(很多线程),如果使用它唤醒线程,那么就必须用while判断满和空的情况(此时如果用if,就会出现问题,因为这一批线程都会生产/消费,但是同一时间消费/生产的只有一个,也就是同一时间只会消费/生产一个数据,用if判断的话push时就会出现问题)。

两种情况导致的现象

生产者生产的慢,消费者消费的快

表现出来的现象:生产一个消费一个,而且消费额的都是最新生产的数据。
生产者线程:生产一个数据sleep(2);
在这里插入图片描述

生产者生产的快,消费者消费的慢

稳定后表现出来的现象:消费一个生产一个。
消费者线程:消费一个数据sleep(3);
在这里插入图片描述
刚开始,一瞬间就将队列生产满了,然后进入消费一个生产一个的情况。

三、实例——计算任务Task

文件Task.hpp

  1 #pragma once
  2 #include<iostream>
  3 using namespace std;
  4 #include<functional>
  5 #include<stdio.h>
  6 class Task
  7 {
  8         using func_t = function<int(int, int, char)>;
  9 public:
 10         Task(){}
 11         Task(int x, int y, char op, func_t func)
 12         :_x(x),
 13         _y(y),
 14         _op(op),
 15         _callback(func)
 16         {}
 17         string operator()()
 18         {
 19                 int result = _callback(_x, _y, _op);
 20                 char buffer[1024];
 21                 snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
 22                 return buffer;
 23         }
 24         string toTaskString()
 25         {
 26                 char buffer[1024];
 27                 snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
 28                 return buffer;
 29         }
 30 private:
 31         int _x, _y;
 32         char _op;
 33         func_t _callback;//回调函数
 34 };

文件BlockQueue.hpp

  1 #pragma once
  2 #include<iostream>
  3 using namespace std;
  4 #include<pthread.h>
  5 #include<time.h>
  6 #include<queue>
  7 const int gmaxcap = 5;
  8 template<class T>
  9 class BlockQueue
 10 {
 11 public:
 12         BlockQueue(const int& maxcap = gmaxcap)
 13         :_maxcap(maxcap)
 14         {
 15                 pthread_mutex_init(&_mutex, nullptr);
 16                 pthread_cond_init(&_pcond, nullptr);
 17                 pthread_cond_init(&_ccond, nullptr);
 18         }
 19         void push(const T& in)
 20         {
 21                 pthread_mutex_lock(&_mutex);
 22                 while(is_full())//队列已满
 23                 {
 24                         pthread_cond_wait(&_pcond, &_mutex);//生产者线程被阻塞
 25                 }
 26                 _q.push(in);
 27                 pthread_cond_signal(&_ccond);//唤醒消费者线程;它可以放在临界区内部被锁保护,也可以放在临界区外部
 28                 pthread_mutex_unlock(&_mutex);
 29         }
 30         void pop(T* out)//out是输出型参数,
 31         {
 32                 pthread_mutex_lock(&_mutex);
 33                 while(is_empty())//队列已空
 34                 {
 35                         pthread_cond_wait(&_ccond, &_mutex);//消费者线程被阻塞
 36                 }
 37                 *out = _q.front();
 38                 _q.pop();
 39                 pthread_cond_signal(&_pcond);//唤醒生产者线程;它可以放在临界区内部被锁保护,也可以放在临界区外部
 40                 pthread_mutex_unlock(&_mutex);
 41         }
 42         ~BlockQueue()
 43         {
 44                 pthread_mutex_destroy(&_mutex);
 45                 pthread_cond_destroy(&_pcond);
 46                 pthread_cond_destroy(&_ccond);
 47         }
 48 private:
 49         bool is_empty(){return _q.empty();}
 50         bool is_full(){return _q.size() == _maxcap;}
 51         queue<T> _q;
 52         int _maxcap;//队列中元素的上限
 53         pthread_mutex_t  _mutex;
 54         pthread_cond_t _pcond;//生产者条件变量
 55         pthread_cond_t _ccond;//消费者条件变量
 56 };

文件test.cc

 #include<time.h>
  2 #include<sys/types.h>
  3 #include<unistd.h>
  4 #include"BlockQueue.hpp"
  5 #include"Task.hpp"
  6 string oper = "+-*/%";
  7 int task(int x, int y, char op)
  8 {
  9         int result = 0;
 10         switch(op)
 11         {
 12                 case'+':
 13                 result = x + y;
 14                 break;
 15                 case'-':
 16                 result = x - y;
 17                 break;
 18                 case'*':
 19                 result = x * y;
 20                 break;
 21                 case'/':
 22                 {
 23                         if(y == 0)
 24                         {
 25                                 cerr<<"div zero error!"<<endl;
 26                                 result = -1;
 27                         }
 28                         else
 29                         result = x / y;
 30                 }
 31                 break;
 32                 case'%':
 33                 {
 34                         if(y == 0)
 35                         {
 36                                 cerr<<"mod zero error!"<<endl;
 37                                 result = -1;
 38                         }
 39                         else
 40                         result = x % y;
 41                 }
 42                 break;
 43                 default:
 44                 break;
 45         }
 46         return result;
 47 }
 48 void* consumer(void* args)
 49 {
 50         BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
 51         while(1)
 52         {
 53                 Task t;
 54                 bq -> pop(&t);
 55                 cout<<"消费任务:"<<t()<<endl;
 56         }
 57         return nullptr;
 58 }
 59 void* productor(void* args)
 60 {
 61         BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
 62         while(1)
 63         {
 64                 int x = rand() % 100 + 1;
 65                 int y = rand()%10;
 66                 int opcode = rand()% oper.size();
 67                 Task t(x, y, oper[opcode], task);
 68                 bq -> push(t);
 69                 cout<<"生产任务:"<<t.toTaskString()<<endl;
 70                 sleep(1);
 71         }
 72         return nullptr;
 73 }
 74 int main()
 75 {
 76         srand((unsigned int)time(nullptr));
 77         BlockQueue<Task>* bq = new BlockQueue<Task>();
 78         pthread_t con, pro;
 79         pthread_create(&con, nullptr, consumer, bq);
 80         pthread_create(&pro, nullptr, productor, bq);
 81         pthread_join(con, nullptr);
 81         pthread_join(con, nullptr);
 82         pthread_join(pro, nullptr);
 83         return 0;
 84 }

运行:
在这里插入图片描述


总结

以上就是今天要讲的内容,本文介绍了Linux多线程中生产消费模型的相关概念。本文作者目前也是正在学习Linux相关的知识,如果文章中的内容有错误或者不严谨的部分,欢迎大家在评论区指出,也欢迎大家在评论区提问、交流。
最后,如果本篇文章对你有所启发的话,希望可以多多支持作者,谢谢大家!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/663577.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

精彩回顾 | “XR云新未来:弹性算力赋能可交互、沉浸式商业实践” 赋能云端虚拟世界

6月15日&#xff0c;由平行云联合首都在线共同主办&#xff0c;中关村软件园协办&#xff0c;以“XR云新未来|弹性算力赋能可交互、沉浸式商业实践”为主题的XR行业交流盛会在北京成功举办。 活动邀请多位XR行业大咖&#xff0c;共同见证首都在线联合平行云发布Cloud XR平台。…

MySQL数据库——索引

MySQL数据库——索引 一、索引基本常识1.索引的概念2.索引的作用3.创建索引的依据 二、索引的分类1.普通索引2.唯一索引3.主键索引4.组合索引5.全文索引 三、索引的查看与删除1.查看索引2.删除索引 一、索引基本常识 数据库索引是数据库管理系统中一个排序的数据结构&#xff0…

OpenGL 深度测试

1.简介 深度缓冲就像颜色缓冲(Color Buffer)&#xff08;储存所有的片段颜色&#xff1a;视觉输出&#xff09;一样&#xff0c;在每个片段中储存了信息&#xff0c;并且&#xff08;通常&#xff09;和颜色缓冲有着一样的宽度和高度。深度缓冲是由窗口系统自动创建的&#xf…

6 从0开始学PyTorch | 构建模型、损失函数、广播机制

前面都在学一些PyTorch的基本操作&#xff0c;从这一节开始&#xff0c;真正进入到模型训练的环节了。原作者很贴心的一步步教我们实现训练步骤&#xff0c;并且还从一个最简单的例子出发&#xff0c;讲了优化方案。 宏观上的训练过程 image.png 当然这里所说的训练还没有到深…

vue进阶-vue-cli

CLI是Command-Line Interface&#xff0c;翻译为命令行界面&#xff0c;但是俗称脚手架。 Vue-CLI是一个官方发布 vue.js 项目脚手架&#xff0c;使用 vue-cli 可以快速搭建 Vue 开发环境以及对应的 webpack 配置。 vue项目相关文件以 .vue 为后缀&#xff0c;需要事先安装 N…

【LeetCode热题100】打卡第25天:柱状图中最大的矩形

文章目录 柱状图中最大的矩形⛅前言&#x1f512;题目&#x1f511;题解 柱状图中最大的矩形 ⛅前言 大家好&#xff0c;我是知识汲取者&#xff0c;欢迎来到我的LeetCode热题100刷题专栏&#xff01; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xf…

GEE:绘制一个点的Landsat1985-2020年逐日NDVI时间序列折线图

作者:CSDN @ _养乐多_ 本文记录了在GoogleEarthEngine(GEE)平台上选择一个点,根据该点在时间段内所有有效像素值绘制折线图的代码。 结果如下图所示, 文章目录 一、代码二、代码链接一、代码 var roi = geometry Map.addLayer(roi, {color

6.15集合1 和 泛型

举例 1&#xff1a;中药店&#xff0c;每个抽屉外面贴着标签 举例 2&#xff1a;超市购物架上很多瓶子&#xff0c;每个瓶子装的是什么&#xff0c;有标签 举例 3&#xff1a;家庭厨房中 集合 我们接下来要学习的内容是Java基础中一个很重要的部分&#xff1a;集合 1 Coll…

【服务器数据恢复】AIX下raid故障导致pool无法加载的数据恢复案例

服务器数据恢复环境&#xff1a; IBM P740小型机AIX操作系统Sybase数据库V7000存储。V7000存储配置了12块SAS机械硬盘&#xff08;其中一块为热备盘&#xff09;组建一组raid5磁盘阵列。存储设备一共创建了2组Mdisk&#xff0c;加到一个pool中。 服务器故障&#xff1a; IBM V…

网络管理与维护(三)网络安全

网络安全 网络安全威胁 非授权访问 信息泄漏 破坏数据完整性 拒绝服务攻击 利用网络传播病毒 安全服务和安全机制 总结 1.通常可以把网络信息安全的问题划分为物理层、网络层、数据层和内容层四个层面。 2.网络存在的威胁主要表现&#xff1a; 非授权访问、信息泄漏、破坏数…

基于Django+Vue开发的社区疫情管理系统(附源码)

基于Django、Django Rest framework、Vue的前后端分离的社区疫情管理系统。 一、系统功能 用户管理&#xff08;只有管理员有权限&#xff09; 用户注册用户登录修改用户信息删除用户修改密码权限管理 首页数据展示 国内疫情数据展示国内疫情新闻近30日的感染人数&#xff08;…

【论文阅读】Adap-t: Adaptively Modulating Embedding Magnitude for Recommendation

【论文阅读】Adap-&#x1d70f;: Adaptively Modulating Embedding Magnitude for Recommendation 文章目录 【论文阅读】Adap-&#x1d70f;: Adaptively Modulating Embedding Magnitude for Recommendation1. 来源2. 介绍3. 模型解读3.1 准备工作3.1.1 任务说明3.1.2 基于嵌…

Linux:http服务(Apache 2.4.57)源码编译——配置网站 || 入门到入土

目录 1.下载源码包 2.配置httpd运行环境 3.编译源码包安装apache软件 4.优化执行路径 5.添加httpd系统服务 正文 1.httpd服务器的基本配置 2.本章持续更新 我的服务器为centos7系统 1.下载源码包 访问官方网站↓↓↓ Welcome! - The Apache HTTP Server Project ↑↑…

ONLYOFFICE Docs 7.4 版本大大增强了图形编辑功能!

ONLYOFFICE Docs 7.4 版本大大增强了图形编辑功能&#xff01; 书接上文&#xff1a; 北冰洋汽水我的最爱https://mp.weixin.qq.com/s?__bizMzI2MjUyNzkyNw&mid2247493734&idx1&sn416c4ee5756ea59883591d3c2c4a6ae4&chksmea4b66bedd3cefa89050e25b661e0be16…

malloc 背后的虚拟内存 和 malloc实现原理

面试的时候经常会被问到 malloc 的实现。从操作系统层面来说&#xff0c;malloc 确实是考察面试者对操作系统底层的存储管理理解的一个很好的方式&#xff0c;涉及到虚拟内存、分页/分段等。下面逐个细说。 1. 虚拟内存 首先需要知道的是程序运行起来的话需要被加载的物理内存…

spark 和 flink 的对比

一、设计理念 Spark 的数据模型是 弹性分布式数据集 RDD(Resilient Distributed Dattsets)&#xff0c;这个内存数据结构使得spark可以通过固定内存做大批量计算。初期的 Spark Streaming 是通过将数据流转成批 (micro-batches)&#xff0c;即收集一段时间(time-window)内到达的…

【计算机组成原理】——知识点复习(期末不挂科版)

课本&#xff1a; 考试题型&#xff1a; 题型一、计算题&#xff08;30分&#xff09; 1、定点数表示&#xff1a;用原码、反码、补码、移码表示十进制数&#xff08;5分&#xff09; 2、浮点数表示&#xff1a;十进制数↔单精度浮点数&#xff08;5分&#xff09; 3、加减运…

行业报告 | AI+制造业赋能,机器视觉开启掘金新大陆(上)

原创 | 文 BFT机器人 01 核心要点 Al制造业赋能&#xff0c;META 发布 SAM 助力机器视觉迎来 GPT 时刻。 机器视觉技术使得工业设备能够“看到”它正在进行的操作并进行快速决策&#xff0c;完整机器视觉系统由硬件软件组成,分别进行成像和图像处理工作。 目前&#xff0c;以“…

全网超全,接口自动化测试实战总结详全,这几个阶段你知道吗?

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 接口自动化根本目…

【深入浅出 Spring Security(十)】权限管理的概述和使用详情

权限管理 一、授权的核心概念二、权限管理策略权限表达式&#xff08;SpEL Spring EL&#xff09;1. 基于 URL 的权限管理&#xff08;过滤器&#xff09;基本用法 2. 基于 方法 的权限管理&#xff08;AOP&#xff09;EnableGlobalMethodSecurity基本用法 三、权限管理之版本问…