基于blockqueue的生产和消费模型

news2024/9/20 9:03:51

线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念:

同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。

通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示的是一个线程在疯狂的抢票,这就导致了其他线程抢占不到临界资源而导致的饥饿问题。

所以在抢代码的代码逻辑中,我们需要保证各个线程以同步的方式进行抢票。那如何实现呢?就需要用到信号量:

条件变量的接口函数:

条件变量的使用:

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *start_routine(void *args)
{
    std::string name = static_cast<const char *>(args);
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex); // 为什么要有mutex,后面就说
        // 判断暂时省略
        std::cout << name << " -> " << tickets << std::endl;
        tickets--;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    // 通过条件变量控制线程的执行
    pthread_t t[5];
    for (int i = 0; i < 5; i++)
    {
        char *name = new char[64];
        snprintf(name, 64, "thread %d", i + 1);
        pthread_create(t+i, nullptr, start_routine, name);
    }

    while (true)
    {
        sleep(1);
        // pthread_cond_signal(&cond);
        pthread_cond_broadcast(&cond);
        std::cout << "main thread wakeup one thread..." << std::endl;
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(t[i], nullptr);
    }

    return 0;
}

条件变量的理解:

我们用一个例子来说明:

没有条件变量就好比是面试官在一个办公室面试,只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室,面试结束后想要将钥匙归还时感觉自己面的不好,这时自己离钥匙也是最近的,所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试,很明显这不合理。

但是有了条件变量就好比是面试官在外面设置了一个等待区,面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队,这就保证了各个应聘者都有机会面试。

 条件不满足的时候,线程必须去某些定义好的条件变量上进行等待

pthread_cond_wait第二个参数的意义(将锁传进去):该函数调用的时候,会以原子性的方式,将锁释放,并将线程挂起等待。用pthread_cond_sign将线程唤醒时,该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题:

生产者消费者模型

什么是生产者消费者模型呢?我们用一个形象的图来说明:

生产者消费者模型就是生产者可以将生产的数据存放在一个共享区,消费者从共享区中获得数据进行消费。由于是共享区,就要保证数据的安全性。当生产者生产一个数据时,另一个生产者不能在同一个数据上生产,不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据,不然可能会导致数据的不一致性,因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据,而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了,也会导致数据的不安全性。因此生产者和消费者之间是互斥关系,同时我们还希望生产者生产以后就有消费者来消费,消费者消费完一个就有生产者来生产,因此生产者和消费者之间是同步关系。

总结一下就是3种关系、两种角色、一个交易场所:

这一段特殊的缓冲区可以提前存放一批数据,这样消费者想消费的时候消费,生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题,是它们不具有强耦合的关系。

基于blockqueue的生产和消费模型

基于普通方式处理数据

代码实现:

//Main.cc

#include <iostream>
#include <unistd.h>
#include <time.h>
#include "BlockQueue.hpp"


using namespace std;


void* consumer(void* args)
{
    BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int val =0;
        bq->pop(&val);
        cout<<"我是消费者,我消费了一个数字:"<<val<<endl;
        sleep(1);
    }
 
    return nullptr;
}


void* productor(void* args)
{
    BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int val =rand()%10;
        bq->push(val);
        cout<<"我是生产者,我生产了一个数字:"<<val<<endl;
        //sleep(1);
    }
    return nullptr;
}

int main()
{

    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<int>* queue =new BlockQueue<int>(5);
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,(void*)queue);
    pthread_create(&p,nullptr,productor,(void*)queue);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);

    return 0;
}


//BlockQueue.hpp
#include <queue>
#include <pthread.h>
#include <iostream>
using namespace std;

const int NUM =5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int numsize =NUM)
        :_maxsize(NUM)
    {
        pthread_mutex_init(&_lock,nullptr);
        pthread_cond_init(&_ccond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
        
    }

    void push(const T& in)
    {
        pthread_mutex_lock(&_lock);
        while(is_Full())
        {
            //这里说明阻塞队列是满的,需要让生产者等待
            pthread_cond_wait(&_pcond,&_lock);
            
        }
        //这里说明阻塞队列至少有一个空位可以插入
        _queue.push(in);
        //唤醒消费者去消费
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_lock);
        
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_lock);
        while(is_Empty())
        {
            //这里说明阻塞队列是空的,需要让消费者等待
            pthread_cond_wait(&_ccond,&_lock);
        }
        //这里说明阻塞队列至少有一个数据
        *out=_queue.front();
        _queue.pop();
        //唤醒生产者生产数据
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_lock);


    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_ccond);
        pthread_cond_destroy(&_pcond);

    }

private:
    bool is_Full()
    {
        return _queue.size()==_maxsize;
    }

    bool is_Empty()
    {
        return _queue.empty();
    }


private:
    queue<T> _queue;
    int _maxsize;
    pthread_mutex_t _lock;   //保护临界资源的锁
    pthread_cond_t _ccond;   //消费者的条件变量
    pthread_cond_t _pcond;   //生产者的条件变量
};

代码细节:

基于计算器任务的Task

我们得创建一个task.hpp,里面定义一个CalTask类:

class CalTask
{
public:
    using func_t =std::function<int(int,int,char)>;

    CalTask(){}
        
    CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func)
    {

    }

    std::string operator()()
    {
        int result =_callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,result);
        return buffer;

    }

    std::string to_string()
    {
        char buffer[64];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }



    ~CalTask()
    {

    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;

};

生产者任务:

void* productor(void* args)
{
    BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);
    while(true)
    {
        int x =rand()%10;
        int y =rand()%10;
        char op =op_str[rand()%op_str.size()];
        CalTask task(x,y,op,mymath);
        bq->push(task);
        cout<<"productor task:"<<task.to_string()<<endl;
        //sleep(1);
    }
    return nullptr;
}

消费者任务:

void* consumer(void* args)
{
    BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);
    while(true)
    {
        CalTask t;
        bq->pop(&t);
        string result =t();
        cout<<"consumer result:"<<result<<endl;
        sleep(1);
    }
 
    return nullptr;
}


基于两个阻塞队列实现计算与存储:

将计算和存储的两个队列放进同一个类中:

template<class c,class s>
class BlockQueues
{

public:
    BlockQueue<c>* c_bq; 
    BlockQueue<s>* s_bq;
};

main函数:

int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueues<CalTask,SaveTask> bqs;
    
  
    bqs.c_bq =new BlockQueue<CalTask>;
    bqs.s_bq =new BlockQueue<SaveTask>;
    //BlockQueue<CalTask>* bqs =new BlockQueue<CalTask>(5);
    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,(void*)&bqs);
    pthread_create(&p,nullptr,productor,(void*)&bqs);
    pthread_create(&s,nullptr,saver,(void*)&bqs);


    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);

    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

存储任务:

class SaveTask
{
    typedef std::function<void(const std::string&)> func_t;
public:
    SaveTask()
    {}
    SaveTask(const std::string &message, func_t func)
    : _message(message), _func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    std::string _message;
    func_t _func;
};

void Save(const std::string &message)
{
    const std::string target = "./log.txt";
    FILE *fp = fopen(target.c_str(), "a+");
    if(!fp)
    {
        std::cerr << "fopen error" << std::endl;
        return;
    }
    fputs(message.c_str(), fp);
    fputs("\n", fp);
    fclose(fp);
}

存储线程执行的任务:

void *saver(void *bqs_)
{
    BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;

    while(true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        std::cout << "save thread,保存任务完成..." << std::endl; 
    }
    return nullptr;
}

代码的执行结果:

生产者与消费者模型优势总结:

这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理,而是多个线程能够在加载任务处理任务的时候进行并发处理。

在我们上面的代码逻辑中很简单,就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单,有时需要从网络或者数据库中加载,这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务,随后竞争出一名生产者将任务放入共享资源(阻塞队列)中,然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理,而是在加载任务的时候做到并发节省时间。同理处理任务,当一个线程处理任务的同时,另一个线程仍可以从阻塞队列取出任务处理,不影响之前的进程处理任务,因此消费者处理任务的环节也做到了并发,实现了加载任务和处理任务的解耦,提高了整个程序的运行效率和速度。

到这里本章的内容就全部结束了,创作不易,希望大家多多点赞支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/977769.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pytorch学习:卷积神经网络—nn.Conv2d、nn.MaxPool2d、nn.ReLU、nn.Linear和nn.Dropout

文章目录 1. torch.nn.Conv2d2. torch.nn.MaxPool2d3. torch.nn.ReLU4. torch.nn.Linear5. torch.nn.Dropout 卷积神经网络详解&#xff1a;csdn链接 其中包括对卷积操作中卷积核的计算、填充、步幅以及最大值池化的操作。 1. torch.nn.Conv2d 对由多个输入平面组成的输入信号…

ChatGPT AIGC 完成超炫酷的大屏可视化

大屏可视化一直各大企业进行数据决策的重要可视化方式,接下来我们先来看一下ChatGPT,AIGC人工智能帮我们实现的综合案例大屏可视化效果: 像这样的大屏可视化使用HTML,JS,Echarts就可以来完成,给ChatGPT,AIGC发送指令的同时可以将数据一起发送给ChatGPT。 第一段指令加数…

Direct3D绘制旋转立方体例程

初始化文件见Direct3D的初始化_direct3dcreate9_寂寂寂寂寂蝶丶的博客-CSDN博客 D3DPractice.cpp #include <windows.h> #include "d3dUtility.h" #include <d3dx9math.h>IDirect3DDevice9* Device NULL; IDirect3DVertexBuffer9* VB NULL; IDirect3…

【C语言】入门——结构体

目录 结构体 为什么有结构体&#xff1f; 1.结构体的声明 1.2结构体变量的访问和初始化 2.结构体成员的访问 结构体 struct 结构体类型 {//相关属性; }结构体变量; 结构体和数组不同&#xff0c;同一类型的数据的集合是数组&#xff1b; 结构体是多种类型的数据的集合&…

NSV60600MZ4T1G 双极型晶体管(BJT)学习总结

双极型晶体管的起源: 双极型晶体管是在1947年发明的&#xff0c;第一个晶体管是将两条具有尖锐端点的金属线与锗衬底(germanium substrate)形成点接触(point contact)&#xff0c;以今天的水准来看&#xff0c;此第一个晶体管虽非常简陋但它却改变了整个电子工业及人类的生活方…

CANdelaStudio CDD编写方法

本文是基于CANdelaStudio12.0讲解 一.把DTC从Excel导入cdd的方法 问题一&#xff1a;当导入DTC的xxx.cdi文件报如下红色错误 可能原因&#xff1a;在设置具有下拉框的属性的内容时&#xff0c;输入的内容不在下拉框列表中 解决办法:在.cddt文件中更新“”Error Code Table“”…

通达信趋向指标DMI公式详解

DMI指标(Directional Movement Index)也称趋向指标或动向指标&#xff0c;是用于衡量市场的趋势方向以及趋势强度的一种技术指标&#xff0c;由著名的技术派大师威尔斯威尔德(Welles Wilder)于1978年发表在《技术交易系统新概念》这本书中。威尔斯威尔德(Welles Wilder)这位大佬…

企微SCRM营销平台MarketGo-ChatGPT助力私域运营

一、前言 ChatGPT是由OpenAI&#xff08;开放人工智能&#xff09;研发的自然语言处理模型&#xff0c;其全称为"Conversational Generative Pre-trained Transformer"&#xff0c;即对话式预训练转换器。它是GPT系列模型的最新版本&#xff0c;GPT全称为"Gene…

springboot项目中application.properties无法变成小树叶问题解决

1.检查我们的resources目录的状态&#xff0c;看看是不是处在普通文件夹的状态&#xff0c;如果是的话&#xff0c;我们需要重新mark一下 右键点击文件夹&#xff0c;选择mark directory as → resources root 此时我们发现配置文件变成了小树叶 2.如果执行了上述方法还是不行…

[uniapp]踩坑日记 unexpected character > 1或‘=’>1 报错

在红色报错文档里下滑&#xff0c;找到Show more 根据提示看是缺少标签&#xff0c;如果不是缺少标签&#xff0c;看看view标签内容是否含有<、>、>、<号,把以上符合都进行以<号为例做{{“<”}}处理

超详细-Vivado配置Sublime+Sublime实现VHDL语法实时检查

目录 一、前言 二、准备工作 三、Vivado配置Sublime 3.1 Vivado配置Sublime 3.2 环境变量添加 3.3 环境变量验证 3.4 Vivado设置 3.5 配置验证 3.6 解决Vivado配置失败问题 四、Sublime配置 4.1 Sublime安装Package Control 4.2 Sublime安装VHDL插件 4.3 语法检查…

排序相关问题

本篇博客在B站做了内部分享,标题为「排序相关问题」 MySQL的ORDER BY有两种排序实现方式&#xff1a; 利用有序索引获取有序数据 (不得不进行)文件排序 在explain中分析时&#xff0c;利用有序索引获取有序数据显示Using index&#xff0c;文件排序显示Using filesort。 1. 能够…

macm1环境下jdk版本切换

macm1环境下jdk版本切换 本文目录 macm1环境下jdk版本切换下载jdk安装动态切换jdk终端生效全局生效 参考 下载jdk oracle官方源下载地址 https://www.oracle.com/java/technologies/downloads/#jdk17-mac Azul下载地址 https://www.azul.com/downloads/?packagejdk#download…

Autosar-Runnables(可运行实体)

文章目录 Runnable entities (简称Runnables)一、Runnables的定义二、Runnables的作用三、DaVinci配置总结Runnable entities (简称Runnables) 包含实际实现的函数(具体的逻辑算法或者操作) Runables由RTE周期性、或事件触发调用(如,当接收到数据、被操作调用) 一、Runna…

时序预测 | MATLAB实现ELM极限学习机时间序列预测未来

时序预测 | MATLAB实现ELM极限学习机时间序列预测未来 目录 时序预测 | MATLAB实现ELM极限学习机时间序列预测未来预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现ELM极限学习机时间序列预测未来&#xff1b; 2.运行环境Matlab2018及以上&#xff0c;data为数…

MYSQL MHA实现故障转移和自动切换

目录 1、MHA理论&#xff1a; 1.1、MHA概述 1.2、MHA的组成&#xff1a; 1.3、特点&#xff1a; 1.4、传统的MySQL主从架构存在一些常见的问题&#xff1a; 1.5、MHA工作原理总结如下 1.6、 故障切换备选主库的算法&#xff1a; 2、 故障转移实验 2.1、搭建 MySQL MHA…

Linux知识点 -- 网络编程套接字

Linux知识点 – 网络编程套接字 文章目录 Linux知识点 -- 网络编程套接字一、预备知识1.认识端口号2.套接字3.TCP协议与UDP协议4.网络字节序 二、socket编程接口1.socket常见API2.sockaddr结构 三、UDP套接字编程1.直接打印客户端信息2.执行客户端发来的指令3.多用户聊天4.在wi…

ALBEF、VLMO、BLIP、BLIP2、InstructBLIP要点总结(WIP)

ALBEF&#xff08;ALign BEfore Fuse&#xff09; 为什么有5个loss&#xff1f; 两个ITC两个MIM1个ITM。ITM是基于ground truth的&#xff0c;必须知道一个pair是不是ground truth&#xff0c;同时ITM loss是用了hard negative&#xff0c;这个是和Momentum Distillation&…

优化爬虫效率:利用HTTP代理进行并发请求

网络爬虫作为一种自动化数据采集工具&#xff0c;广泛应用于数据挖掘、信息监测等领域。然而&#xff0c;随着互联网的发展和网站的增多&#xff0c;单个爬虫往往无法满足大规模数据采集的需求。为了提高爬虫的效率和性能&#xff0c;我们需要寻找优化方法。本文将介绍一种利用…

(位运算) 剑指 Offer 56 - I. 数组中数字出现的次数 ——【Leetcode每日一题】

❓剑指 Offer 56 - I. 数组中数字出现的次数 难度&#xff1a;中等 一个整型数组 nums 里除两个数字之外&#xff0c;其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是 O ( n ) O(n) O(n)&#xff0c;空间复杂度是 O ( 1 ) O(1) O(1)。 示例 …