【Linux多线程】生产者消费者模型

news2025/1/15 20:40:36

【Linux多线程】生产者消费者模型

目录

  • 【Linux多线程】生产者消费者模型
    • 生产者消费者模型
      • 为何要使用生产者消费者模型
      • 生产者消费者的三种关系
      • 生产者消费者模型优点
      • 基于BlockingQueue的生产者消费者模型
        • C++ queue模拟阻塞队列的生产消费模型
      • 伪唤醒情况(多生产多消费的情况下)

作者:爱写代码的刚子

时间:2024.3.29

前言:本篇博客将会介绍Linux多线程中一个非常重要的模型——生产者消费者模型

生产者消费者模型

  • 321原则(方便记忆):3种关系,2种角色(生产者和消费者),1个交易场所(特定结构的内存空间)

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者的三种关系

  • 生产者VS生产者 :互斥
  • 消费者VS消费者:互斥
  • 生产者VS消费者:互斥,同步

生产者消费者模型优点

  • 生产和消费进行解耦(多线程其实也是一种解耦)
  • 支持并发
  • 支持忙闲不均

在这里插入图片描述

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构(有点类似于管道)。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述

C++ queue模拟阻塞队列的生产消费模型

代码:

以下代码以单生产者,单消费者为例:

  • 代码一:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
    static const int defalutnum = 5;
public:
    BlockQueue(int maxcap=defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&c_cond_,nullptr);
        pthread_cond_init(&p_cond_,nullptr);
    }
    T pop()
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()== 0 )
        {
             pthread_cond_wait(&c_cond_,&mutex_);//生产和消费要使用不同的等待队列
        }
        
        T out = q_.front();
        q_.pop();
        pthread_cond_signal(&p_cond_);
        
        
        pthread_mutex_unlock(&mutex_);
        return out;
    }
    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()==maxcap_)//判断本身也是访问临界资源
        {
            pthread_cond_wait(&p_cond_,&mutex_);//调度时自动释放锁
        }
        
        //1.队列没满 2.被唤醒
        
        q_.push(in); 
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
private:
    std::queue<T> q_;//我们不直接使用stl中的queue是因为它本身不是线程安全的,共享资源
    //int mincap_;
    int maxcap_;//队列中的极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;
};
#include "BlockQueue.hpp"
#include <unistd.h>
void *Consumer(void *args)
{
    BlockQueue<int> *bq =  static_cast<BlockQueue<int>*>(args); 
    while(true)
    {
        sleep(2);// 由于两个线程谁先执行是不确定的,我们让生产者先执行
        int data = bq->pop();
        std::cout<<"消费了一个数据: "<<data<<std::endl;
        
    }
}
void *Productor(void *args)
{
    BlockQueue<int> *bq =  static_cast<BlockQueue<int>*>(args); 

    int data=0;
    while(true)
    {
        ++data;
        bq->push(data);
        std::cout<<"生产了一个数据: "<<data<<std::endl;
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(); 
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    delete bq;
    
    return 0;
}

在这里插入图片描述

  • 调整代码,使其生产者生产的数据到达一定范围通知消费者,消费者消费了一定的数据通知生产者:

在这里插入图片描述

在这里插入图片描述

【问】:生产者的数据从哪里来?

用户,或者网络等。生产者生产的数据也是要花时间获取的!,所以生产者要做两件事:1. 获取数据 2. 生产数据到队列

  • 同时消费者拿到数据要做加工处理,也要花时间!,消费者要做两件事:1. 消费数据 2. 加工处理数据

【问】:生产者消费者模型为什么是高效的?

存在一个线程访问临界区的代码,一个线程正在处理数据,高效并发。

虽然互斥和同步谈不上高效,更何况加了锁,但是一个线程正在生产数据,一个线程正在消费数据,两者解偶且互不影响。(在更多的生产者消费者情况下,只有少量的执行流在互斥和同步,而大量的执行流都在并发访问)

  • 再次完善代码,使该生产者消费者模型能够执行相应的任务:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
    static const int defalutnum = 20;
public:
    BlockQueue(int maxcap=defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&c_cond_,nullptr);
        pthread_cond_init(&p_cond_,nullptr);
        low_water_ =maxcap_/3;
        high_water_ =(maxcap_*2)/3;
    }
    T pop()
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()== 0 )
        {
             pthread_cond_wait(&c_cond_,&mutex_);//生产和消费要使用不同的等待队列
        }
        
        
        T out = q_.front();
        q_.pop();

        if(q_.size()<low_water_)
        {
            pthread_cond_signal(&p_cond_);
        }
        
        
        pthread_mutex_unlock(&mutex_);
        return out;
    }
    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()==maxcap_)//判断本身也是访问临界资源
        {
            pthread_cond_wait(&p_cond_,&mutex_);//调度时自动释放锁
        }
        
        //1.队列没满 2.被唤醒
        
        q_.push(in); 
        if(q_.size()>high_water_)
        {
            pthread_cond_signal(&c_cond_);
        }
        pthread_mutex_unlock(&mutex_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
    
private:
    std::queue<T> q_;//我们不直接使用stl中的queue是因为它本身不是线程安全的,共享资源
    //int mincap_;
    int maxcap_;//队列中的极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;

    int high_water_;
    int low_water_;
};

main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
void *Consumer(void *args)
{
    BlockQueue<Task> *bq =  static_cast<BlockQueue<Task>*>(args); 
    while(true)
    {
        Task t=bq->pop();


        //t.run();
        t();
        std::cout<<"处理任务: "<<t.GetTask()<<" 运算结果是: "<<t.GetResult()<<std::endl;
        
        //sleep(2);// 由于两个线程谁先执行是不确定的,我们让生产者先执行
        //std::cout<<"消费了一个数据: "<<data<<std::endl;
        
    }
}
void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq =  static_cast<BlockQueue<Task>*>(args); 
    
    int data=0;
    while(true)
    {
        int data1=rand()%10+1;
        usleep(10);
        int data2=rand() % 10;
        char op =opers[rand() % len];
        Task t(data1,data2,op);
        //++data;
        bq->push(t);
        //std::cout<<"生产了一个数据: "<<data<<std::endl;
        std::cout<<"生产了一个任务:"<< t.GetTask() <<std::endl;
         
        sleep(1);
    }
}
int main()
{
    srand(time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>(); 
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    delete bq;
    
    return 0;
}

Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    DivZero = 1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
            case '+':
                result_ = data1_ + data2_;
                break;
            case '-':
                result_ = data1_ - data2_;
                break;
            case '*':
                result_ = data1_ * data2_;
                break;
            case '/':
            {
                if (data2_ == 0)
                    exitcode_ = DivZero;
                else
                    result_ = data1_ / data2_;
            }
            break;
            case '%':
            {
                if (data2_ == 0)
                    exitcode_ = ModZero;
                else
                    result_ = data1_ % data2_;
            }
            break;
            default:
                exitcode_ = Unknown;
                break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
         std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";
        return r;
    }

    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r+=oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }

    ~Task()
    {}

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

在这里插入图片描述

一定要记得,判断临界资源是否满足,也是在访问临界资源!!!

伪唤醒情况(多生产多消费的情况下)

多生产多消费的情况下:

举例:生产者只生产了一个数据,但是唤醒了多个消费者,多个消费者都在等待队列上,生产者将锁解开,多个消费者竞争这一把锁,其中一个消费者抢到了这把锁消费了一个数据,把锁解开,同时其他刚被唤醒的消费者其中又抢到了锁,进行消费,可是已经没有数据了(条件并不满足了),造成了伪唤醒的情况。(处于等待队列中的线程申请锁失败了会继续在条件变量中的等待队列中等)

或者说可能存在等待失败但是继续向下走的情况。

如何防止线程出现这种情况?

将if改成while(进行重复判断):

在这里插入图片描述

在这里插入图片描述

【问题】:无论是多生产多消费还是单生产单消费,本质上都是一个线程访问临界资源,那意义在哪?

重点是并发生产,并发消费,只是访问临界资源时是单个线程。重点不是获取数据本身,而在于处理数据!!!(本质)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1560561.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL数据库----------探索高级SQL查询语句 (二)

目录 一、子查询 1.1多表查询 1.2多层嵌套 1.3 insert语句子查询 1.4update语句子查询 1.5delete语句子查询 1.6EXISTS 1.7子查询&#xff0c;别名as 二、mysql视图 2.1mysql视图介绍 2.2mysql作用场景[图]: 2.3视图功能&#xff1a; 2.4视图和表的区别和联系 区别…

Linux 个人笔记之三剑客 grep sed awk

文章目录 零、预一、grep 文本过滤工具基础篇实战篇 二、sed 字符流编辑器基础篇实战篇 三、awk 文本处理工具基础篇实战篇 四、附xargsuniq & sort基础篇实战篇 cut 零、预 bash 的命令行展开 {} $ echo file_{1..4} file_1 file_2 file_3 file_4$ echo file_{a..d} file_…

【攻防世界】unseping (反序列化与Linux bash shell)

打开题目环境&#xff1a; 1、进行PHP代码审计&#xff0c;通过审计得知需要用到PHP反序列化。找到输出flag的位置为 ping()函数。通过使用 exec() 函数来执行 $ip 并将结果保存在 $result 中&#xff0c;最终输出 $result。 2、接着寻找给 $ip 传参的位置&#xff0c;发现通过…

FastEI论文阅读

前言 研究FastEI有很长时间了&#xff0c;现在来总结一下&#xff0c;梳理一下认知。论文地址&#xff1a;https://www.nature.com/articles/s41467-023-39279-7&#xff0c;Github项目地址&#xff1a;https://github.com/Qiong-Yang/FastEI。 概要 这篇文章做的工作是小分子…

16.面向对象的软件测试技术

主要考点&#xff1a; 1、面向对象相关的基础概念&#xff1b;&#xff08;已经在软件工程的课程中讲过&#xff0c;要熟悉UML图&#xff0c;知道类和类之间的关系&#xff0c;这些知识也可能结合到下午题考察&#xff09; 2、面向对象的软件测试技术&#xff1b;&#xff08;大…

基于单片机汽车超声波防盗系统设计

**单片机设计介绍&#xff0c;基于单片机汽车超声波防盗系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机汽车超声波防盗系统设计概要主要涉及利用超声波传感器和单片机技术来实现汽车的安全防盗功能。以下是对…

辽宁梵宁教育:点亮大学生设计技能之光

辽宁梵宁教育作为专注于设计教育的线上机构&#xff0c;对大学生设计技能的提升和就业前景产生了深远的影响。在当前数字化时代&#xff0c;设计技能已逐渐成为各行各业不可或缺的重要能力&#xff0c;而梵宁教育正是抓住了这一机遇&#xff0c;致力于培养具备创新思维和实践能…

还在问如何入门 Python 爬虫?收藏这篇直接带你上路爬虫!!

“入门”是良好的动机&#xff0c;但是可能作用缓慢。如果你手里或者脑子里有一个项目&#xff0c;那么实践起来你会被目标驱动&#xff0c;而不会像学习模块一样慢慢学习。另外如果说知识体系里的每一个知识点是图里的点&#xff0c;依赖关系是边的话&#xff0c;那么这个图一…

Java毕业设计-基于springboot开发的致远汽车租赁系统平台-毕业论文+答辩PPT(附源代码+演示视频)

文章目录 前言一、毕设成果演示&#xff08;源代码在文末&#xff09;二、毕设摘要展示1、开发说明2、需求分析3、系统功能结构 三、系统实现展示1、系统功能模块2、管理员功能模块3、业务员功能模块3、用户功能模块 四、毕设内容和源代码获取总结 Java毕业设计-基于springboot…

LC 106.从中序与后序遍历序列构造二叉树

106. 从中序与后序遍历序列构造二叉树 给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 示例 1: 输入&#xff1a; inorder [9,3,15,20,7], post…

STM32G系 编程连接不上目标板,也有可能是软件不兼容。

由于一直用的老版本STM32 ST-LINK Utility 4.20 &#xff0c;找遍了所有问题&#xff0c;SWD就是连不上目标板。 电源脚 VDDA 地线&#xff0c;SWD的四条线&#xff0c;还是不行&#xff0c;浪费了一天&#xff0c;第二天才想起&#xff0c;是不是G系升级了 SWD协议。结果下载…

安全访问多线程环境:掌握 Java 并发集合的使用技巧

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…

[优选算法专栏]专题十五:FloodFill算法(二)

本专栏内容为&#xff1a;算法学习专栏&#xff0c;分为优选算法专栏&#xff0c;贪心算法专栏&#xff0c;动态规划专栏以及递归&#xff0c;搜索与回溯算法专栏四部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小…

【Vue3】el-checkbox-group实现权限配置和应用

一. 需求 针对不同等级的用户&#xff0c;配置不同的可见项 配置效果如下 &#xff08;1&#xff09;新增&#xff0c;获取数据列表 &#xff08;2&#xff09;编辑&#xff0c;回显数据列表 应用效果如下 &#xff08;1&#xff09;父级配置 &#xff08;2&#xff09;子级…

leetcode90. 子集 II

去重逻辑&#xff1a; 关键是画出递归树&#xff01;当我们即将进入第二个2的递归的时候&#xff0c;发现isVisit数组是100&#xff0c;也就是说这俩重复的数是False&#xff0c;并且这俩在nums值相同&#xff0c;所以写出去重逻辑&#xff01; class Solution { public:vector…

2024-2028年中国导电滑环市场行情及未来发展前景研究报告

导电滑环应用领域广泛 全球市场将保持增长趋势 导电滑环又称为集流环、集电环、导电环&#xff0c;是一种电气连接器件&#xff0c;用于在旋转部件和静止部件之间传输电能信号。导电滑环避免了传统导线在旋转中存在的磨损和扭伤&#xff0c;可提高机器运转效率和稳定性&#xf…

美易官方:通胀持续降温,美联储可能在6月份降息

近期&#xff0c;LPL首席经济学家在接受采访时表示&#xff0c;通胀持续降温&#xff0c;美联储可能在6月份降息。这一消息引起了市场的广泛关注和讨论。通胀一直是全球经济面临的难题之一&#xff0c;而美联储的货币政策也一直是市场关注的焦点。那么&#xff0c;通胀降温和美…

Discord绑VISA卡教程

Discord 是由美国 Discord Inc. 公司所开发的一款专为社群设计的免费网络实时通话软件与数字发行平台&#xff0c;主要针对游戏玩家、教育人士、朋友及商业人士&#xff0c;用户之间可以在软件的聊天频道通过讯息、图片、视频和音频进行交流 下面进行实际操作 1、登录discord …

【A-013】基于SSH的共享单车管理系统/共享单车出租系统

【A-013】基于SSH的共享单车管理系统/共享单车出租系统 开发环境&#xff1a; Eclipse/MyEclipse、Tomcat8、Jdk1.8 数据库&#xff1a; MySQL 适用于&#xff1a; 课程设计&#xff0c;毕业设计&#xff0c;学习等等 系统介绍&#xff1a; 基于SSH开发的共享单车管理系统/…

新质生产力:1核心,2摆脱,3关键,3因素,3特征;3要素,3措施

引言 新质生产力是指以科技创新为核心驱动力&#xff0c;通过提高全要素生产率、推动产业升级和转型&#xff0c;实现经济高质量发展的能力和水平。在当今全球经济竞争日趋激烈的背景下&#xff0c;新质生产力成为各国竞争力的关键之一&#xff0c;对于实现经济可持续发展、提…