基于阻塞队列及环形队列的生产消费模型

news2025/1/22 21:01:23

目录

条件变量函数

等待条件满足

阻塞队列

升级版

信号量

POSIX信号量

环形队列


条件变量函数

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 
 参数: 
 cond:要在这个条件变量上等待 
 mutex:互斥量,后面详细解释 

pthread_cond_wait:第二个参数必须是正在使用的互斥锁

a.pthread_cond_wait:该函数调用时,会以原子性的方式将锁释放,并将自己挂起

b.pthread_cond_wait:该函数被唤醒返回的时候,会自动从新获取锁

阻塞队列

blockqueue.hpp

#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
static const int gmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap)
        :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
            pthread_cond_wait(&_pcond,&_mutex);//生产条件不满足
        _q.push(in);
        //阻塞队列中一定有数据
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty)
            pthread_cond_wait(&_ccond,&_mutex);
        *out=_q.front();
        _q.pop();
        //队列中一定有一个空位置
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;
};


task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

升级版

blockqueue.hpp

#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
static const int gmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap)
        :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
            pthread_cond_wait(&_pcond,&_mutex);//生产条件不满足
        _q.push(in);
        //阻塞队列中一定有数据
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty)
            pthread_cond_wait(&_ccond,&_mutex);
        *out=_q.front();
        _q.pop();
        //队列中一定有一个空位置
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;
};


task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class CalTask
{
    using func_t=function<int(int,int,char)>;
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result=x+y;
        break;
    case '-':
        result=x-y;
        break;
    case '*':
        result=x*y;
        break;
    case '/':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x/y;
    }
        break;
    case '%':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x%y;
    }
        break;
    default:
        break;
    }
    return result;
}
class SaveTask
{
    typedef function<void(const string&)> func_t;
public:
    SaveTask()
    {}
    SaveTask(const string& message,func_t func)
        :_message(message),_func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    string _message;
    func_t _func;
};
void Save(const string& message)
{
    const string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(!fp)
    {
        cerr<<"fopen error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
}

MainCp.cc

#include"BlockQueue.hpp"
#include"task.hpp"
#include<sys/types.h>
#include<unistd.h>
#include<ctime>

//
//
template<class C,class S>
class BlockQueues
{

public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;
};
void* consumer(void* bqs_)
{
    BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->c_bq;
    BlockQueue<SaveTask>* save_bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->s_bq;
    
    while(true)
    {
        /* consumer */
        // int data;
        // bq->pop(&data);
        CalTask t;
        bq->pop(&t);
        string result=t();
        cout<<"消费数据: "<<result<<endl;

        SaveTask save(result,Save);
        save_bq->push(save);
        cout<<"推送保存任务完成..."<<endl;
        sleep(1);
    }
    return nullptr;
}
void* producter(void* bqs_)
{
    BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->c_bq;
    
    while (true)
    {
        //producer
        int x=rand()%10+1;
        int y=rand()%5;
        int operCode=rand()%oper.size();
        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        cout<<"生产任务: "<<t.toTaskString()<<endl;
        // sleep(1);
    }
    return nullptr;
}
void* saver(void* bqs_)
{
    BlockQueue<SaveTask>* save_bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->s_bq;
    while (true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        cout << "推送保存任务完成..." << endl;
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr));
    BlockQueues<CalTask,SaveTask> bqs;

    bqs.c_bq=new BlockQueue<CalTask>();
    bqs.s_bq=new BlockQueue<SaveTask>();
    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,&bqs);
    pthread_create(&p,nullptr,producter,&bqs);
    pthread_create(&s,nullptr,saver,&bqs);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

./MainCp
生产任务: 9 * 0 = ? 
生产任务: 9 - 4 = ? 
生产任务: 8 - 0 = ? 
生产任务: 3 - 4 = ? 
生产任务: 6 + 1 = ? 
消费数据: 9 * 0 = 0 
推送保存任务完成...
生产任务: 2 - 2 = ? 
推送保存任务完成...
消费数据: 9 - 4 = 5 
推送保存任务完成...
生产任务: 9 - 0 = ? 
推送保存任务完成...
消费数据: 8 - 0 = 8 
推送保存任务完成...
生产任务: 6 * 3 = ? 
推送保存任务完成...
消费数据: 3 - 4 = -1 
推送保存任务完成...
生产任务: 4 * 4 = ? 
推送保存任务完成...
消费数据: 6 + 1 = 7 
推送保存任务完成...
生产任务: 5 % 4 = ? 
推送保存任务完成...
^C
zhangsan@ubuntu:~/practice-using-ubuntu/20241005/blockqueue$ cat log.txt
9 * 0 = 0 
9 - 4 = 5 
8 - 0 = 8 
3 - 4 = -1 
6 + 1 = 7 

信号量

a.信号量的本质就是计数器

b.只有拥有信号量,在未来就一定能拥有临界资源的一部分

申请信号量的本质就是:对临界资源中特点小块资源的预定机制

sem--         申请资源       P        必须保证操作的原子性

sem++       释放资源        V       必须保证操作的原子性

POSIX信号量

环形队列

RingQueue.hpp

#pragma once

#include<iostream>
#include<cassert>
#include<vector>
#include<ctime>
#include<cstdlib>
#include<semaphore.h>
#include<unistd.h>
#include<pthread.h>

static const int gcap=5;

template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        int n=sem_wait(&sem);
        assert(n==0);
    }
    void V(sem_t& sem)
    {
        int n=sem_post(&sem);
        assert(n==0);
    }
public:
    RingQueue(const int& cap=gcap):_queue(cap),_cap(cap)
    {
        int n=sem_init(&_spaceSem,0,_cap);
        assert(n==0);
        n=sem_init(&_dataSem,0,0);
        assert(n==0);
        _productorStep=_consumerStep=0;
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);
    }
    void Push(const T& in)
    {
        
        P(_spaceSem);//productor
        pthread_mutex_lock(&_pmutex);
        _queue[_productorStep++]=in;
        _productorStep%=_cap;
        pthread_mutex_unlock(&_pmutex);//更高效
        V(_dataSem);
        
    }
    void Pop(T* out)
    {
        pthread_mutex_lock(&_cmutex);
        P(_dataSem);
        *out=_queue[_consumerStep++];
        _consumerStep%=_cap;
        V(_spaceSem);
        pthread_mutex_unlock(&_cmutex);
    }
    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    vector<T> _queue;
    int _cap;
    sem_t _spaceSem;//生产者->空间资源
    sem_t _dataSem;
    int _productorStep;
    int _consumerStep;
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result=x+y;
        break;
    case '-':
        result=x-y;
        break;
    case '*':
        result=x*y;
        break;
    case '/':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x/y;
    }
        break;
    case '%':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x%y;
    }
        break;
    default:
        break;
    }
    return result;
}

main.cc

#include"RingQueue.hpp"
#include"task.hpp"

using namespace std;

void* ProductorRoutine(void* rq)
{
    RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>* >(rq);
    while (true)
    {
        /* code */
        int x=rand()%100;
        int y=rand()%50;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        ringqueue->Push(t);
        cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
}
void* ConsumerRoutine(void* rq)
{
    RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>* >(rq);
    while (true)
    {
        /* code */
        Task t;
        ringqueue->Pop(&t);
        string result=t();
        cout<<"消费者消费了一个任务"<<result<<endl;
    }
    
}
int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<Task>* rq=new RingQueue<Task>();

    pthread_t p,c;
    pthread_create(&p,nullptr,ProductorRoutine,rq);
    pthread_create(&c,nullptr,ConsumerRoutine,rq);
    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    delete rq;
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2193043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

文字转语音免费的有哪些?这6款文字转语音软件让你配音效果炸满!

文字转语音免费的有哪些&#xff1f;文字转语音不管在有声朗读、配乐配音、影视旁白等实际生活场景中的应用都是非常广泛的&#xff0c;而目前语音识别文字的技术日渐成熟&#xff0c;已经渗透到生活办公的日常&#xff0c;包括我们的输入法自带语音转文字&#xff0c;都可以非…

105页PPT麦肯锡:煤炭贸易企业业务战略规划方案

麦肯锡作为全球领先的管理咨询公司&#xff0c;在协助客户进行企业业务战略规划方面形成了独特且系统的方法论。以下是对麦肯锡企业业务战略规划方法论的详细阐述&#xff1a; 一、战略规划的核心要素 战略方向的明确&#xff1a;战略规划的首要任务是帮助组织明确其愿景、使…

使用ValueConverters扩展实现枚举控制页面的显示

1、ValueConverters 本库包含了IValueConverter接口的的最常用的实现&#xff0c;ValueConverters用于从视图到视图模型的值得转换&#xff0c;某些情况下&#xff0c;可用进行反向转换。里面有一些抽象类、模板类的定义&#xff0c;可以继承这些类实现一些自己想要实现的功能…

k8s中pod的管理

一、资源管理 1.概述 说到k8s中的pod&#xff0c;即荚的意思&#xff0c;就不得不先提到k8s中的资源管理&#xff0c;k8s中可以用以下命令查看我们的资源&#xff1a; kubectl api-resources 比如我们现在需要使用k8s开启一个东西&#xff0c;那么k8s通过apiserver去对比etc…

C++模版SFIANE应用踩的一个小坑

一天一个C大佬同事&#xff0c;突然截图过来一段代码&#xff1a;这写的啥呀&#xff0c;啰里吧嗦的&#xff0c;这个构造函数模板参数T1感觉是多余的呀 template<class T> class TestClass { public:TestClass(){}//函数1template<class T1 T, std::enable_if_t<…

【springboot】简易模块化开发项目整合Redis

接上一项目&#xff0c;继续拓展项目 1.整合Redis 添加Redis依赖至fast-demo-config模块的pom.xml文件中 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependenc…

Jax(Random、Numpy)常用函数

目录 Jax vmap Array reshape Random PRNGKey uniform normal split choice Numpy expand_dims linspace jax.numpy.linalg[pkg] dot matmul arange interp tile reshape Jax jit jax.jit(fun, in_shardingsUnspecifiedValue, out_shardingsUnspecifiedVa…

国际象棋和大模型的内部世界 (2)

国际象棋和大模型的内部世界 &#xff08;2&#xff09; 最近一直在做大模型的一些实践和应用工作。最近看了一些agent的一些在大模型上的探索&#xff0c;包括基于大模型驱动的类似MUD类的游戏。 最近2篇论文都是基于国际象棋的&#xff0c;作者的思路基本上差不多&#xff0c…

SQL第12课——联结表

三点&#xff1a;什么是联结&#xff1f;为什么使用联结&#xff1f;如何编写使用联结的select语句 12.1 联结 SQL最强大的功能之一就是能在数据查询的执行中联结&#xff08;join)表。联结是利用SQL的select能执行的最重要的操作。 在使用联结前&#xff0c;需要了解关系表…

Vue组件库Element-ui

Vue组件库Element-ui Element是一套为开发者、设计师和产品经理准备的基于Vue2.0的桌面端组件库。Element - 网站快速成型工具 安装element-ui npm install element-ui # element-ui版本&#xff08;可以指定版本号引入ElementUI组件库&#xff0c;在main.js中添加内容得到&…

【动态规划-最长公共子序列(LCS)】力扣1035. 不相交的线

在两条独立的水平线上按给定的顺序写下 nums1 和 nums2 中的整数。 现在&#xff0c;可以绘制一些连接两个数字 nums1[i] 和 nums2[j] 的直线&#xff0c;这些直线需要同时满足&#xff1a; nums1[i] nums2[j] 且绘制的直线不与任何其他连线&#xff08;非水平线&#xff09…

graphql--快速了解graphql特点

graphql--快速了解graphql特点 1.它的作用2.demo示例2.1依赖引入2.2定义schema2.3定义GrapQL端点2.4运行测试2.5一些坑 今天浏览博客时看到graphQL,之前在招聘网站上第一次接触,以为是图数据查询语言, 简单了解后,发现对graphQL的介绍主要是用作API的查询语言,不仅限于图数据查…

dbeaver的使用

新增mysql连接 新增clickhouse 连接 新建编辑器 执行 结果&#xff0c;想看某条结果明细&#xff0c;选中某行安tab键 设置快捷键 窗口-》首选项-》用户界面-》键

ReentrantLock 实现原理

文章目录 ReentrantLock 基本使用可重入锁等待可中断设置超时时间公平锁条件变量 ReentrantLock 原理加锁流程解锁流程可重入锁原理可打断原理公平锁原理条件变量原理 ReentrantLock 基本使用 在Java中&#xff0c;synchronized 和 ReentrantLock 都是用于确保线程同步的锁&am…

JUPITER Benchmark Suite:是一套全面的23个基准测试程序,目的支持JUPITER——欧洲首台E级超级计算机的采购

2024-08-30&#xff0c;由于利希超级计算中心 创建JUPITER Benchmark Suite&#xff0c;这是一个全面的 23 个基准测试程序集合&#xff0c;经过精心记录和设计&#xff0c;目的支持购买欧洲第一台百万兆次级超级计算机 JUPITER。 一、研究背景&#xff1a; 随着E级超级计算机…

AI大模型有哪些,收藏起来防踩坑

大模型是指具有数千万甚至数亿参数的深度学习模型&#xff0c;通常由深度神经网络构建而成&#xff0c;拥有数十亿甚至数千亿个参数。大模型的设计目的是为了提高模型的表达能力和预测性能&#xff0c;能够处理更加复杂的任务和数据。以下是对大模型的详细数据与分析&#xff1…

在网页中渲染LaTex公式

概述 MathJax可以实现网页浏览器中的LaTex公式渲染。 引入 可以使用特定的模板形式引入和配置&#xff0c;具体可参考&#xff1a;配置mathjax — MathJax 3.2 文档 (osgeo.cn)。其中代码可以以CDN形式引入&#xff1a;mathjax (v3.2.2) -BootCDN。 <script> MathJax …

【C++驾轻就熟】vector深入了解及模拟实现

​ 目录 ​编辑​ 一、vector介绍 二、标准库中的vector 2.1vector常见的构造函数 2.1.1无参构造函数 2.1.2 有参构造函数&#xff08;构造并初始化n个val&#xff09; 2.1.3有参构造函数&#xff08;使用迭代器进行初始化构造&#xff09; 2.2 vector iterator 的使…

集全CNS!西北农林发表建校以来第一篇Nature

9月25日&#xff0c;国际学术期刊《自然》&#xff08;Nature&#xff09;在线发表了西北农林科技大学青年科学家岳超研究员领衔的团队题为《极端森林大火放大火后地表升温》的研究成果。该研究首次从林火规模这一独特视角&#xff0c;揭示了极端大火对生态系统破坏性、林火碳排…

受电端取电快充协议芯片的工作原理

随着电池技术的不断进步&#xff0c;快充技术应运而生&#xff0c;它以惊人的速度解决了“电量焦虑”成为手机技术发展的重要程碑。 快充技术&#xff0c;通过提高充电功率&#xff0c;大幅度缩短手机等设备充电时间的技术。相对于传统的慢充方式&#xff0c;快充技术能够在短…