Linux之【多线程】生产者与消费者模型BlockQueue(阻塞队列)

news2024/11/13 11:26:40

生产者与消费者模型

  • 一、了解生产者消费者模型
  • 二、生产者与消费者模型的几种关系及特点
  • 三、BlockQueue(阻塞队列)
    • 3.1 基础版阻塞队列
    • 3.2 基于任务版的阻塞队列
    • 3.3 进阶版生产消费模型--生产、消费、保存
  • 四、小结

一、了解生产者消费者模型

举个例子:学生要买东西,一般情况下都会直接联系厂商,因为买的商品不多,对于供货商来说交易成本太高,所以有了交易场所超市这个媒介的存在。目的就是为了集中需求,分发产品。
在这里插入图片描述

消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,厂商可以继续生产,当厂商不再生产的时候消费者购买商品!

上述生产的过程和消费的过程互相影响的程度很低——解耦
临时的保存产品的场所——缓冲区

函数调用:main函数通过用户输入生产了数据,用变量保存了数据,要调用的函数消费了数据,当main函数调用func函数,main函数就会阻塞等待func函数返回,这种情况称为强耦合关系

利用生产者消费者模式可以解决强耦合问题,将串行调用改为并行执行,提高执行效率,完成逻辑的解耦。
在这里插入图片描述

二、生产者与消费者模型的几种关系及特点

对消费者与生产者模型,可以用以下321原则说明

  • 三种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥&&同步),互斥保证共享资源的安全性,同步是为了提高访问效率

  • 二种角色:生产者线程,消费者线程

  • 一个交易场所:一段特定结构的缓冲区

生产消费模型的特点

  1. 未来生产线程和消费线程进行解耦

  2. 支持生产和消费的一段时间的忙闲不均的问题(缓存区有数据有空间)

  3. 生产者专注生产,消费专注消费,提高效率

如果缓冲区满了,生产者只能进行等待,如果超市缓冲区为空,消费者只能进行等待。

三、BlockQueue(阻塞队列)

3.1 基础版阻塞队列

阻塞队列:阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构

阻塞队列为空时,从阻塞队列中获取元素的线程将被阻塞,直到阻塞队列被放入元素。
阻塞队列已满时,往阻塞队列放入元素的线程将被阻塞,直到有元素被取出。
在这里插入图片描述

单生产单消费测试

//BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
using  namespace std;

const int gmaxcap=5;

template<class T>
class BlockQueue
{  
private:
    std::queue<T> q_;
    int maxcap_;//队列容量
    pthread_mutex_t mutex_;
    pthread_cond_t  pcond_;//生产者对应的条件变量
    pthread_cond_t  ccond_;//消费者者对应的条件变量
public:
    BlockQueue(const int& maxcap=gmaxcap)
    :maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&pcond_,nullptr);
        pthread_cond_init(&ccond_,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&pcond_);
        pthread_cond_destroy(&ccond_);
    }

    void push(const T &in)//输入性参数 &
    {
        pthread_mutex_lock(&mutex_);

        //1.判断
        //细节2:充当条件变量的语法必须是while,不能是if
        while(is_full())
        {   
            //细节1:该函数会以原子性的方式将锁释放,并将自己挂起
            //被唤醒的时候会自动获取传入的锁
            pthread_cond_wait(&pcond_,&mutex_);//缓冲区满,生产者阻塞等待
        }
        //2.这一步一定没有满
        q_.push(in);
        //3.堵塞队列一定有数据

        //细节3:唤醒行为可以放在解锁前也可以放在解锁后
        pthread_cond_signal(&ccond_);//唤醒消费者
        pthread_mutex_unlock(&mutex_);
        //pthread_cond_signal(&ccond_);//唤醒消费者

    }
    void pop(T* out)//输出型参数:*
    {
        pthread_mutex_lock(&mutex_);
        //1.判断
        while(is_empty())
        {
            pthread_cond_wait(&ccond_,&mutex_);//缓冲区空,消费者阻塞等待
        }
        //2.这一步一定没有满
        *out = q_.front();
        q_.pop();

        //3.堵塞队列一定没有满
        pthread_cond_signal(&pcond_);//唤醒生产者
        pthread_mutex_unlock(&mutex_);
    }
private:
    bool is_empty()
    {
        return q_.empty();
    }    
    bool is_full()
    {
        return q_.size()==maxcap_;
    } 
};
//Main.cc
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>


void* productor(void* bq_)//生产
{
    BlockQueue<int> * bq=static_cast<BlockQueue<int>*>(bq_);
    while (true)
    {
        int data=rand()%10+1;
        bq->push(data);
        std::cout<<"生产数据: "<<data<<std::endl;
        
    }
    return nullptr;
}



void* consumer(void* bq_)//消费
{
    BlockQueue<int> * bq=static_cast<BlockQueue<int>*>(bq_);
    while (true)
    {
        int data;
        bq->pop(&data);
        std::cout<<"消费数据: "<<data<<std::endl;
        sleep(1);
    }
    return nullptr;
}



int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<int> * bq=new BlockQueue<int>();

    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

控制生产速度,即每间隔1s生产一次,生产一个消费一个,而且消费的都是最新的数据
在这里插入图片描述
控制消费速度,即每间隔1s消费一次,刚开始生产多个,稳定后生产一个消费一个,消费的是以前的数据
在这里插入图片描述

以上代码的三个细节

  • 细节一:pthread_cond_wait(&pcond_,&mutex_);第二个参数是锁,该函数调用会以原子性的方式将锁释放,并将自己挂起;被唤醒的时候会自动获取传入的锁
  • 细节二:判断空和满的时候要用while,存在多个生产者因满挂起后,消费者使用一个后,同时唤醒所有生产者,导致数据多增加
  • 细节三:唤醒行为可以放在解锁前也可以放在解锁后。解锁前唤醒:唤醒之后某个生产者得到锁的优先级高,消费者释放,生产者立马拿到;解锁后唤醒:随机被某个消费者拿走锁,不影响

3.2 基于任务版的阻塞队列

基于上述代码,新建一个Task.hpp,用来给线程派发任务执行任务

BlockQueue.hpp如上
/*****************/
#pragma once

#include <iostream>
#include <cstdio>
#include <functional>
class Task
{

public:
    using func_t =std::function<int(int,int,char)>;
    Task(){}
    Task(int x,int y,char op,func_t callback)
        :x_(x),y_(y),op_(op),callback_(callback)
    {

    }
    std::string operator()()
    {
        int result=callback_(x_,y_,op_);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
        return buffer;
    }
private:
    int x_;
    int y_;
    char op_;
    func_t callback_;
};

/**************************/
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"

const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}

void* productor(void* bq_)//生产
{    
    BlockQueue<Task> * bq=static_cast<BlockQueue<Task>*>(bq_);
    while (true)
    {
        int x=rand()%100+1;
        int y=rand()%10;
        int operCode=rand() % oper.size();

        Task t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"生产任务: "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}



void* consumer(void* bq_)//消费
{
    BlockQueue<Task> * bq=static_cast<BlockQueue<Task>*>(bq_);
    while (true)
    {
        Task t;
        bq->pop(&t);
        std::cout<<"消费任务:"<<t()<<std::endl;
        //sleep(1);
    }
    return nullptr;
}



int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<Task> * bq=new BlockQueue<Task>();

    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}


3.3 进阶版生产消费模型–生产、消费、保存

任务目标:

  1. 生产者(线程1)生产任务加入到计算任务队列中
  2. 消费者&生产者(线程2)消费计算队列中任务并将计算结果推送到存储任务队列中
  3. 消费者(线程3)消费存储任务队列,将结果保存到文件中

在这里插入图片描述

设计思路

  1. 生产者productor将计算任务CalTask,push到计算队列中

  2. 消费者&生产者consumer获取计算任务CalTask,并将计算任务结果结合Save方法构造一个SaveTask对象,然后将这个对象push到存储队列中

  3. 消费者saver拿到存储任务,通过回调函数将数据写进文件中

代码实现如下:

/*Main.cc*/
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"
//定义一个队列保存计算任务队列和保存任务队列
template<class C,class S>
class TwoBlockQueue
{
public:
    BlockQueue<C> * c_bq;
    BlockQueue<S> * s_bq;
};

void* productor(void* bqs)//生产
{    
    BlockQueue<CalTask> * bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->c_bq;
    while (true)
    {
        int x=rand()%100+1;
        int y=rand()%10;
        int operCode=rand() % oper.size();

        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"productor->生产任务: "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}



void* consumer(void* bqs)//消费
{
    //拿到计算队列
    BlockQueue<CalTask> * bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->c_bq;
    //拿到保存队列
    BlockQueue<SaveTask> * save_bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->s_bq;
    while (true)
    {
        //得到任务并处理
        CalTask t;
        bq->pop(&t);
        string result=t();//

        std::cout<<"consumer->消费任务:"<<result<<std::endl;

        //存储任务
        SaveTask save(result,Save);
        save_bq->push(save);
        std::cout<<"consumer->推送保存任务完成..."<<std::endl;

        //sleep(1);
    }
    return nullptr;
}

void* saver(void* bqs)
{
    BlockQueue<SaveTask> * save_bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->s_bq;
    while (true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        cout<<"saver->保存任务完成"<<endl;
    }
    
    return nullptr;
}




int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    TwoBlockQueue<CalTask,SaveTask> bqs;
    bqs.c_bq=new BlockQueue<CalTask>();
    bqs.s_bq=new BlockQueue<SaveTask>();


    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,&bqs);
    pthread_create(&p,nullptr,productor,&bqs);
    pthread_create(&s,nullptr,saver,&bqs);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

Task.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <cstring>
#include <functional>
class CalTask//计算任务
{

public:
    using func_t =std::function<int(int,int,char)>;
    CalTask(){}
    CalTask(int x,int y,char op,func_t callback)
        :x_(x),y_(y),op_(op),callback_(callback)
    {

    }
    std::string operator()()
    {
        int result=callback_(x_,y_,op_);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
        return buffer;
    }
private:
    int x_;
    int y_;
    char op_;
    func_t callback_;
};
const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}


class SaveTask
{
    typedef std::function<void(const std::string&)> func_t;
public:
    SaveTask(){}
    SaveTask(const std::string& message,func_t func)
    :message_(message),func_(func)
    {

    }
    void operator()()
    {
        func_(message_);
    }

private:
    std::string message_;
    func_t func_;
};
void Save(const std::string& message)
{
    const std::string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(!fp)
    {
        std::cerr<<"fopen error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
}

在这里插入图片描述

四、小结

阻塞队列也适用于多生产者多消费
在阻塞队列中,无论外部线程再多,真正进入到阻塞队列里生产或消费的线程永远只有一个
在一个任务队列中,有多个生产者与多个消费者,由于有锁的存在,所以任意时刻只有一个执行流在阻塞队列里放或者取。

生产消费模型高效体现在哪里

高效并不是体现在从队列中消费数据高效

而是我们可以让一个、多个线程并发的同时计算多个任务!在计算多个任务的同时,并不影响其他线程继续从队列里拿任务的过程。

也就是说,生产者消费者模型的高效:可以在生产之前与消费之后让线程并行执行

生产任务需要花费时间,不是把任务放进队列就完事了;消费任务也是需要时间的,不是把任务从队列中拿出来就完事了,还要处理它,处理它期间不影响其它线程消费,反之亦然,这才是生产者与消费者模型的高效体现!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/481022.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机组成原理第五章(3)DMA处理

三种控制方式&#xff1a;程序查询程序中断DMA方式 回顾一下 之前的中断控制方式&#xff0c;如果是输入命令&#xff0c;先启动命令&#xff0c;通过地址总线选择相应的接口&#xff0c;通过地址译码之后的到选择电路&#xff0c;这个接口准备开始工作。 启动外部设备&#…

网络编程总结一:

一、网络基础&#xff1a; 概念&#xff1a;1> 网络编程的本质就是进程间的通信&#xff0c;只不过进程分布在不同的主机上 2>在跨主机传输过程中&#xff0c;需要确定通信协议后&#xff0c;才可以通信 1. OSI体系结构&#xff08;重点&#xff09; 定义7层模型&…

类和对象(上篇)【C++】

C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题。 C是基于面向对象的&#xff0c;关注的是对象&#xff0c;将一件事拆分成不同的对象&#xff0c;靠对象之间的交互完成。 目录 一、类的定义 二、访问限定符…

【小样本分割 2020 TPAMI 】PFENet

文章目录 【小样本分割 2020 TPAMI 】PFENet1. 简介1.1 问题1) 高级特征误用造成的泛化损失2) 查询样本和空间样本之间的空间不一致 1.2 方法 2. 网络2.1 整体架构2.2 先验掩膜生成2.3 FEM模块 3. 代码 【小样本分割 2020 TPAMI 】PFENet 论文题目&#xff1a;Prior Guided Fea…

flinkCDC相当于Delta.io中的什么 delta.io之CDF

类似flink CDC databricks 官方文档: How to Simplify CDC With Delta Lakes Change Data Feed - The Databricks Blog delta.io 官方文档: Change data feed — Delta Lake Documentation 概述 更改数据馈送 (CDF) 功能允许 Delta 表跟踪 Delta 表版本之间的行级更改 在…

C语言函数与递归

目录&#x1f60a; 1. 函数是什么&#x1f43e; 2. 库函数&#x1f43e; 3. 自定义函数&#x1f43e; 4. 函数参数&#x1f43e; 5. 函数调用&#x1f43e; 6. 函数的嵌套调用和链式访问&#x1f43e; 7. 函数的声明和定义&#x1f43e; 8. 函数递归&#x1f43e; 1. 函…

二叉搜索树(内含AVL树的旋转操作的详细解释)

二叉搜索树 二叉搜索树的概念二差搜索树结构设计二叉搜索树的操作以及实现遍历判空插入查找删除(☆☆☆)二叉搜索树的其他方法 二叉搜索树的应用二叉搜索树的性能分析二叉树习题练习AVL树AVL树的概念AVL树的结构设计AVL树的插入(非常重要)AVL树的旋转(☆☆☆☆☆)AVL树的插入操…

基于STATCOM的风力发电机稳定性问题仿真分析(Simulink)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

网页三剑客之 HTML

本章开始我们来介绍一下网页前端部分&#xff0c;我们只是简单的介绍一些常用的各种标签&#xff0c;其目的在于为我们后面的项目做准备。 我们并不要求能完全掌握前端的语法&#xff0c;但是在见到以后能够认识这些代码就可以了。 想走后端开发的&#xff0c;前端不需要多么…

【C++】哈希和unordered系列封装

1.哈希 1.1 哈希概念 顺序结构以及平衡树中&#xff0c;元素关键码与其存储位置之间没有对应的关系&#xff0c;因此在查找一个元素时&#xff0c;必须要经过关键码的多次比较。顺序查找时间复杂度为O(N)&#xff0c;平衡树中为树的高度&#xff0c;即O( l o g 2 N log_2 N l…

CSAPP学习笔记 2 浮点数(自用)

1. 首先 我们回忆一下计算机思维导论的编码问题 小白鼠问题 (107条消息) 小白鼠喝水问题------计算机思维 编码思想(自用)_和光同尘463的博客-CSDN博客 2. 对于一些可表示的浮点数比如 101.11可以用二进制精确表示 因为是2的倍数 但是 对于一些不可整除的浮点数 我们又如何…

阿里云服务器部署node项目笔记

阿里云部署node项目笔记 此过程中全部安装都按照B站教程实现本篇是个人笔记&#xff0c;许多细节并未陈述比如开发阿里云对应端口等&#xff0c;不是完整的过程&#xff0c;如有误导在此致歉。 安装node报错linux查看nginx配置文件 使用 nginx -t mongodb数据库安装解决&#x…

【JAVA】#详细介绍!!! 文件操作之File对象(1)!

本文内容不涉及文件内容操作&#xff0c;主要是对指定文件元信息的获取&#xff0c;以及通过java代码如何创建一个文件或者删除文件 目录 文件操作的File对象 File对象的基本操作方法 得到文件&#xff08;夹&#xff09;对象的信息元 1.getParent 2. getName 3.getPath 4…

CentOS 安装与配置Nginx【含修改配置文件】

1.安装Nginx yum install nginx -y2.启动Nginx systemctl start nginx查询是否启动nginx systemctl status nginx3.尝试访问 这是默认的配置文件 # For more information on configuration, see: # * Official English Documentation: http://nginx.org/en/docs/ # * …

wordcloud制作词云图

wordcloud制作词云图 wordcloud中文方框问题 jieba&#xff08;分词&#xff09;jieba库分词的三种模式 wordcloud WordCloud(font_pathNone, width400, height-200,margin2,maskNone, max_words200, min_font_size4, stopwordsNone,background_colorblack, max_font_sizeNone…

js中setinterval怎么用?怎么才能让setinterval停下来?

setinterval()是定时调用的函数&#xff0c;可按照指定的周期&#xff08;以毫秒计&#xff09;来调用函数或计算表达式。 setinterval()的作用是在播放动画的时&#xff0c;每隔一定时间就调用函数&#xff0c;方法或对象。 setInterval() 方法会不停地调用函数&#xff0c;…

浙大数据结构与算法一些有意思的理论基础题

堆栈 有人给出了堆栈用数组实现的另一种方式&#xff0c;即直接在函数参数中传递数组和top变量&#xff08;而不是两者组成的结构指针&#xff09;&#xff0c;其中Push操作函数设计如下。这个Push函数正确吗&#xff1f;为什么&#xff1f; #define MaxSize 100 ElementTyp…

Three.js--》Gsap动画库基本使用与原理

目录 Gsap动画库使用讲解 Gsap动画库基本使用 修改自适应画面及双击进入全屏 设置stats性能监视器 Gsap动画库使用讲解 GSAP的全名是GreenSock Animation Platform&#xff0c;是一个从flash时代一直发展到今天的专业动画库&#xff0c;今天将其与three.js进行结合&#x…

【DevOps视频笔记】1. DevOps的诞生

视频官网 目录 一、DevOps介绍 定义&#xff1a; 作用&#xff1a; 核心&#xff1a; 二、软件开发流程 三、流程图 一、DevOps介绍 定义&#xff1a; Development & Operations的缩写&#xff0c;也就是开发&运维DevOps 是一个不断提高效率并且持续不断工作的…

GPIO输出——LED闪烁、LED流水灯、蜂鸣器

1、STM32F1 GPIO 简介 GPIO &#xff08; General Purpose Input Output &#xff09;通用输入输出口 可配置为 8 种输入输出模式 引脚电平&#xff1a; 0V~3.3V &#xff0c;部分引脚可容忍 5V 输出模式下可控制端口输出高低电平&#xff0c;用以驱动 LED 、控制蜂鸣器、模拟通…