并发编程之生产者消费者模型

news2025/1/19 3:22:55

什么是生产者消费者模型

生产者消费者模型是多线程中一个比较典型的模型。

打个比方:你是一个客户,你去超市里买火腿肠。

这段话中的 "你"就是消费者, 那么给超市提供火腿肠的供货商就是生产者。超市呢?超市是不是被所有人所共享?大家都可以去访问超市,所以这里的超市是一份临界资源。

所以生产者消费者有三种关系,两种角色,一个交易场所。

三种关系:

1.生产者与生产者

2.消费者与消费者

3.生产者与消费者

生产者与生产者是竞争关系,因为厂商之间互相竞争。所以生产与生产者是互斥关系。

消费者与消费者其实也是竞争关系,但是因为商品够多,而消费者消费速度太慢,所以没有明显的区别。但如果世界上只剩下最后一瓶矿泉水了,那是不是大家都会去抢呢? 所以消费者与消费者其实也是互斥关系。

生产者与消费者也是竞争关系,我们生产者和消费者看成两个线程,超市看成一份临界资源。那么这两个线程是不是都要访问这个临界资源?既然都要访问这个临界资源,那么生产和消费者也是互斥关系。但不仅仅是互斥,因为生产者把超市装满了,是不是要等待用户来消费?同理如果超市空了,消费者是不是要等待生产者来供货?所以生产和消费者还有一层关系,那就是同步

两种角色

生产者与消费者

一个交易场所

一份临界资源,生产者向临界资源提供数据,消费者从临界资源中拿数据。

有没有发现生产与消费者模型很像管道?没错,管道就是典型的生产者与消费者模型。

这是一个多生产者多消费者的模型。

在这里插入图片描述

接下来我们就来实现一个基于阻塞队列的生产者消费者模型。这里的阻塞队列冲当的就是临界资源,生产者把数据放进阻塞队列,消费者把数据从阻塞队列中拿出。

锁的封装

首先我们用RAII风格的锁。

MyLock类

#include<pthread.h> 
class MyLock
    {
    public:
        MyLock(pthread_mutex_t* pmtx): _pmtx(pmtx){}
        void Lock(){ pthread_mutex_lock(_pmtx);}
        void Unlock() { pthread_mutex_unlock(_pmtx);}
    private:
        pthread_mutex_t* _pmtx;
    };

LockGuard类

#include<pthread.h>
class LockGuard
    {
    public:
        LockGuard(pthread_mutex_t* pmtx):_mtx(pmtx){
            _mtx.Lock();
        }
        ~LockGuard()
        {
            _mtx.Unlock();
        }

    private:
        MyLock _mtx;
    };

这个类的构造函数是加锁,析构函数是解锁。所以我们只需要创建一个这个类的对象的代码和临界资源的代码放在一起,就可以实现加锁和解锁了。这种方式可以避免有时候解锁忘记写了导致死锁的问题。

阻塞队列的实现

block_queue类的声明

#include<queue>
#include<pthread.h>
#include<iostream>
#include "Task.hpp"
#include "LockGuard.hpp" 
#define DEFAULT_NUM 5
template<class T> //因为不确定阻塞队列放的数据类型, 所以用模板参数
    class block_queue
    {
        private:
        size_t _num; //阻塞队列的容量
        std::queue<T> _blockqueue;  //阻塞队列
        pthread_mutex_t _mtx;  //锁
        pthread_cond_t _full;  //条件变量,让生产者在阻塞队列为满时进行等待
        pthread_cond_t _empty;  //条件变量,让消费者在阻塞队列为空时进行等待
        public: 
        block_queue(size_t num = DEFAULT_NUM); //构造函数

        ~block_queue(); // 析构
        //生产者生产
        void Push(const T& task);
        // 消费者消费
        void Pop(T* out);

        private:
        //让当前线程在指定的条件变量下等待
        void Wait(pthread_cond_t* cond) {pthread_cond_wait(cond,&_mtx);}
        //唤醒指定条件变量下等待的线程
        void Wakeup(pthread_cond_t* cond) {pthread_cond_signal(cond);}
        //判断阻塞队列是否满了
        bool isfull() { return _blockqueue.size() == _num;}
        //判断阻塞队列是否为空
        bool isempty() { return _blockqueue.size() == 0;}

    };

我们的阻塞队列实际上只提供2个操作,一个是push(生产者放数据),一个是pop(消费者拿数据)。

block_queue类的实现


#define DEFAULT_NUM 5
template<class T>
    class block_queue
    {
        private:
        size_t _num;
        std::queue<T> _blockqueue; 
        pthread_mutex_t _mtx; 
        pthread_cond_t _full; 
        pthread_cond_t _empty; 

        public: 
        block_queue(size_t num = DEFAULT_NUM) : _num(num){
            pthread_mutex_init(&_mtx,nullptr);
            pthread_cond_init(&_full,nullptr);
            pthread_cond_init(&_empty,nullptr);
        }
        ~block_queue()
        {
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_full);
            pthread_cond_destroy(&_empty);
        }

        //生产者生产
        void Push(const T& task)
        {
            LockGuard lockguard(&_mtx); //加锁,出了作用域自动解锁
            while(isfull()) Wait(&_full); //生产队列已满,生产者在full条件变量下等待
            //被唤醒后添加任务到生产队列
            _blockqueue.push(task);
            printf("%p 生产了一个任务 : %d %c %d\n",pthread_self(),task._x,task._op,task._y); //这是对任务的打印....暂且无视,等Task类实现完后看结果的
            Wakeup(&_empty); //唤醒消费者
        }

        // 消费者消费
        void Pop(T* out)
        {
            LockGuard lockguard(&_mtx) ;//加锁,出了作用域自动解锁
            while(isempty()) Wait(&_empty); //生产队列已空,消费者进入等待 
            //被唤醒后添加任务到生产队列
            *out = _blockqueue.front(); //提取任务
            _blockqueue.pop(); //队列pop
            Wakeup(&_full);
        }
        private:
        void Wait(pthread_cond_t* cond) {pthread_cond_wait(cond,&_mtx);}
        void Wakeup(pthread_cond_t* cond) {pthread_cond_signal(cond);}
        bool isfull() { return _blockqueue.size() == _num;}
        bool isempty() { return _blockqueue.size() == 0;}
    };

Task类实现

我们可以往阻塞队列里面放数据,当然也可以往里面放一个任务。这里我们就创建一个加减乘除取模运算的任务类。

#include <iostream>

class Task{
    public:
        Task(){}
        Task(int x, char op,int y):_x(x),_op(op),_y(y),_iserror(false){}
    
        void Runing()
        {
            int ret = 0;
            switch(_op)
            {
                case '+' : ret = _x + _y; break; 
                case '-' : ret = _x - _y; break;
                case '*' : ret = _x * _y; break;
                case '/' :
                { 
                    if(_y) ret = _x / _y;
                    else _iserror = true;
                    break;
                }
                case '%' :
                { 
                    if(_y) ret = _x % _y;
                    else _iserror = true;
                    break;
                }
                default: _iserror = true; 
            }
            if(_iserror) std::cout << "result error" << std::endl;  //如果结果错误打印错误
            else std::cout << _x << _op << _y << "=" << ret << std::endl; //如果结果正确打印完整式子
        }
    public:
        int _x; //第一个操作数
        char _op; //操作符
        int _y; //第二个操作数
        bool _iserror; //结果是否错误
    };

Main

`

#include "BlockQueue.hpp"
#include <time.h>
#include<unistd.h>
#include<string>

#define CONNUM 5 
#define PRODNUM 2

//生产者放任务
void* ProcuderRuning(void* args)
{
    wyl::block_queue<wyl::Task>* bq = (wyl::block_queue<wyl::Task>*)args;
    while(1)
    {
        int x = rand() % 10 + 1;
        int y =  rand()%20;
        char op = "+-*/%"[rand() % 5];
        bq->Push(wyl::Task(x,op,y)); //往阻塞队列中放任务
    }
}

//消费不断拿任务
void* ConsumerRuning(void* args)
{
    wyl::block_queue<wyl::Task>* bq = (wyl::block_queue<wyl::Task>*)args;
    while(1)
    {
        wyl::Task t; 
        bq->Pop(&t); //从阻塞队列中拿任务
        printf("%p 消费了一个任务",pthread_self());
        t.Runing(); //处理任务
        sleep(1); //让消费者不要频繁消费太快,这样阻塞队列满了会等待消费者
    }
}

int main()
{
 	pthread_t con[CONNUM]; 
    pthread_t prod[PRODNUM]; 
    srand((unsigned int)0); //随机数种子
    //创造等待队列
    wyl::block_queue<wyl::Task>* bq = new wyl::block_queue<wyl::Task>(5);

    //创建生产者线程
    for(int i = 0 ; i < PRODNUM ; i++)
    {
        std::string name = "prodcuer ";
        name += std::to_string(i+1); 
        pthread_create(prod + i,nullptr,ProcuderRuning,(void*)bq);
    }
    
    //创建消费者线程
    for(int i = 0 ; i < CONNUM ; i++)
    {
        std::string name = "consumer ";
        name += std::to_string(i+1); 
        pthread_create(con + i,nullptr,ConsumerRuning,(void*)bq);
    }
    
    //等待线程
    for(int i = 0 ; i < PRODNUM ; i++)
    {
        pthread_join(prod[i],nullptr);
    }

    for(int i = 0 ; i < CONNUM ; i++)
    {
        pthread_join(con[i],nullptr);
    }

    return 0;
}

`

消费者慢消费,生产者快生产的执行结果:

在这里插入图片描述

生产者慢生产,消费者快消费的运行结果:

在这里插入图片描述

我们会发现,任务井然有序的执行。生产者放了数据后通知消费拿,消费者把数据拿完又会通知生产者放。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1212106.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

可怕!.Net 8正式发布了,.Net野心确实不小!

随着三天.NET Conf 2023的会议结束了&#xff0c;.Net 8正式发布了。 .Net 8是官方号称有史以来性能最快的一个版本了。 .Net 8 增加了数以千计的性能、稳定性和安全性改进&#xff0c;以及平台和工具增强功能&#xff0c;有助于提高开发人员的工作效率和创新速度。 反正就是…

Oneid 图计算思路

一、前文 oneid 是用户画像的核心&#xff0c;此文提供图计算的具体方案。 二、方案 注意事项&#xff1a; 1. 业务存在解绑信息&#xff0c;当不与其他业务系统产生关联时&#xff0c;沿用旧oneid。 2. oneid 需要自增&#xff0c;下游系统会用到bitmap等数据类型&#xff0…

学习c#的第十三天

目录 C# 多态性 静态多态性 函数重载 运算符重载 动态多态性 virtual 和 abstract 抽象方法和虚方法的区别 重载(overload)和重写(override) 隐藏方法 C# 多态性 多态是同一个行为具有多个不同表现形式或形态的能力。 多态性意味着有多重形式。在面向对象编程范式中…

自由曲线与曲面 -计算机图形学

目录 自由曲线与曲面 函数的连续性 &#xff08;1&#xff09;参数连续性 &#xff08;2&#xff09;几何连续性 bezier 曲线 Bernstein基函数 *公式看不懂&#xff0c;带几个数进去看看&#xff0c;你就更好地可以看到这个公式的本质了 凸包性质 仿射不变性 …

PyCharm 【unsupported Python 3.1】

PyCharm2020.1版本&#xff0c;当添加虚拟环境发生异常&#xff1a; 原因&#xff1a;Pycharm版本低了&#xff01;不支持配置的虚拟环境版本 解决&#xff1a;下载PyCharm2021.1版本&#xff0c;进行配置成功&#xff01;

mysql之搭建MMM架构实现高可用

实验目的 解决mysql的主从服务器单点故障问题&#xff0c;实现高可用 实验思路 实验条件&#xff1a; 主机名 作用 IP地址 组件 mysql1 master01 20.0.0.13 mysql服务、mysql-mmm mysql2 masert02 20.0.0.23 mysql服务、mysql-mmm mysql3 slave01 20.0.0.33 …

C# 使用Microsoft.Office.Interop.Excel库操作Excel

1.在NuGet管理包中搜索&#xff1a;Microsoft.Office.Interop.Excel&#xff0c;如下图红色标记处所示&#xff0c;进行安装 2. 安装完成后&#xff0c;在程序中引入命名空间如下所示&#xff1a; using Microsoft.Office.Interop.Excel; //第一步 添加excel第三方库 usi…

kubernetes集群编排——prometheus监控

部署prometheus 创建项目仓库并上传镜像 编写配置文件 [rootk8s2 values]# vim prometheus-values.yaml alertmanager:alertmanagerSpec:image:repository: prometheus/alertmanagertag: v0.24.0 grafana:enabled: trueimage:repository: grafana/grafanatag: 9.0.6service:typ…

【开发问题解决方法记录】01.dian

一些问题记录 新增角色失败&#xff1a;Error: Ajax 调用为Execute Server-Side Code返回了服务器错误ORA-01722: 无效数字。 【问题原因】&#xff1a;CREATE_BY(NUMBER类型)应该存入USER_ID(NUMBER类型)而非USER_NAME&#xff08;NVARCHAR2类型&#xff09; 【解决方法】将…

一篇文章让你真正搞懂epoll机制

目录 1.epoll简介 2.epoll实现原理 3.创建epoll文件 4.增加&#xff0c;删除&#xff0c;修改epoll事件 5.epoll事件就绪 6.epoll编程流程 7.epoll常见问题&#xff1f; 1.epoll简介 epoll是Linux内核为处理大批量文件描述符而作了改进的poll&#xff0c;它能显著提高程…

一篇文章让你搞懂 MySQL 的锁

一篇文章让你搞懂 MySQL 的锁 1、并发问题的解决方案2、MySQL的各类型锁2.1、从数据操作的类型划分 (读锁、写锁)2.2、从数据操作的粒度划分2.2.1、表锁2.2.1.1、表级别的S 锁、X 锁2.2.1.2、意向锁&#xff08;IS、IX&#xff09;2.2.1.3、自增锁2.2.1.4、元数据锁 2.2.2、行锁…

亚马逊收到CPSC查验通知后卖家需要怎么弄?ASTM F963标准测试 ,CPC认证

收到CPSC查验亚马逊卖家需要怎么做&#xff1f; 。CPSC消费品安全协会&#xff0c;成立于1972年&#xff0c;它的责任是保护广大消费者的利益&#xff0c;通过减少消费品存在的伤害及死亡的危险来维护人身及家庭安全。CPSC现在负责对超过15000种消费品的安全监控&#xff0c;具…

【送书福利-第二十七期】《边缘计算系统设计与实践》

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主、前后端开发、人工智能研究生。公粽号&#xff1a;程序员洲洲。 &#x1f388; 本文专栏&#xff1a;本文…

基于Genio 700 (MT8390)芯片的AR智能眼镜方案

AR眼镜是一种具有前所未有发展机遇的设备&#xff0c;无论是显示效果、体积还是功能都有明显的提升。AR技术因其智能、实时、三维、多重交互和开放世界的特点备受关注。 AR眼镜集成了AR技术、语音识别、智能控制等多项高科技功能&#xff0c;可以帮助用户实现更加便捷、高效、个…

电脑桌面任务提醒便签选择哪一个好用?

伴随着科技现代化的发展&#xff0c;电脑成为大家日常办公及生活中必不可少的工具&#xff0c;如在日常办公中大家可以借助电脑上的任务提醒便签来合理规划自己的工作时间&#xff0c;督促任务的完成&#xff0c;提高工作的效率。 当前&#xff0c;支持在电脑上安装的任务提醒…

RGB转Bayer,一个小数点引发的血案

前几天写了一个RGB数据转Bayer格式的函数&#xff0c;经过测试功能正常。后来把这个函数用到一个数据库构建中&#xff0c;结果数据库出来的结果一直是一张黑图&#xff0c;追查了好几个小时&#xff0c;总算把这只虫子找出来了&#xff0c;原来是一个整数后面的小数点作祟。 …

基础课4——客服中心管理者面临的挑战

客服管理者在当今的数字化时代也面临着许多挑战。以下是一些主要的挑战&#xff1a; 同行业竞争加剧&#xff1a;客服行业面临着来自同行业的竞争压力。为了获得竞争优势&#xff0c;企业需要不断提高自身的产品和服务质量&#xff0c;同时还需要不断降低成本、提高效率。然而…

热烈庆祝瑞森半导体成立10周年

瑞森半导体10年芯路&#xff0c;衷心感谢全球合作伙伴、 客户、员工、朋友的帮助与支持。 弹指一挥间&#xff0c;瑞森半导体已在功率半导体行业奋勇前行了十年。3650个白天与黑夜&#xff0c;瑞森半导体在风雨兼程中砥砺前行&#xff0c;在倾情奉献中不负初心。十年里有太多的…

Go ZIP压缩文件读写操作

创建zip文件 golang提供了archive/zip包来处理zip压缩文件&#xff0c;下面通过一个简单的示例来展示golang如何创建zip压缩文件&#xff1a; func createZip(filename string) {// 缓存压缩文件内容buf : new(bytes.Buffer)// 创建zipwriter : zip.NewWriter(buf)defer writ…

如何高效收集数据?

在当今这个信息爆炸的时代&#xff0c;数据已经成为了一种新的资源&#xff0c;其价值不可估量。收集数据的重要性日益凸显&#xff0c;无论是对于企业、政府还是个人&#xff0c;数据都已经成为了一种宝贵的财富。分享一些网站。 一、宏观数据 1.国家统计局&#xff08;数据…