生产消费者模式

news2024/11/27 22:27:51

6. 生产消费者模式 Producer-Consumer模式

6.1 概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

image-20240926174733001

由于多个线程都访问了同一个阻塞队列,所以会有并发问题

  • 生产者 vs 生产者:互斥
  • 消费者 vs 消费者:互斥
  • 生产者 vs 消费者:互斥,为了防止饥饿问题,也需要同步

所以这里有3种关系,2个角色(生产者和消费者),1个交易场所(特定结构的内存空间)


该模型的优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

注意,生产者生产的数据也是要花时间获取的,当有数据时,消费者做数据的加工处理,也是要花时间的

所以,不要只看到生产者生产数据到队列的过程,当阻塞队列队列满时,生产者在等待队列下等待过程中,是可以做获取数据的工作的。
同理,不要只看到消费者从队列中消费数据的过程,当阻塞队列为空时,消费者在等待队列下等待过程中,是可以做数据的加工处理动作的。
这样,这两个或者多个线程就并发高效的处理数据了,在多生产和多消费体现明显,少量的线程在等待,大量的线程在获取数据和加工数据。

6.2 基于 BlockingQueue 的生产者-消费者模式

这里是单生产者单消费者

// BlockQueue.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
using namespace std;

template<class T>
class BlockQueue
{
    static const int NUM = 10;
public:
    BlockQueue(int num = NUM) : _max(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pCond, nullptr);
        pthread_cond_init(&_cCond, nullptr);
    }

    const T pop()
    {
        pthread_mutex_lock(&_mutex);
        // 判断临界资源是否满足也是在访问临界资源,所以要在加锁之后
        if(_q.size() == 0) {
            // 队列中没有值,不需要再删除了,让它去等待队列中去等
            pthread_cond_wait(&_cCond, &_mutex);	// 调用的时候会自动释放锁,因唤醒而返回时,又会重新持有锁
        }
        const T x = _q.front();
        _q.pop();
        pthread_cond_signal(&_pCond);   // 消费者已经消费数据了,此时就可以唤醒生产者了。
        pthread_mutex_unlock(&_mutex);
        return x;
    }

    void push(const T& x)
    {   
        pthread_mutex_lock(&_mutex);
        if(_q.size() == _max) {
            // 已经到了最大值,不需要再添加了,让它去等待队列中去等
            pthread_cond_wait(&_pCond, &_mutex);
        }
        _q.push(x);
        pthread_cond_signal(&_cCond);   // 生产者已经生产数据了,此时就可以唤醒消费者了。
        pthread_mutex_unlock(&_mutex);  

    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pCond);
        pthread_cond_destroy(&_cCond);
    }

private:
    queue<T> _q;
    pthread_mutex_t _mutex;
    pthread_cond_t _cCond;  // 消费者条件变量
    pthread_cond_t _pCond;  // 生产者条件变量
    size_t _max;    // 队列能放的最大值
};
// main.cc
#include "BlockQueue.hpp"

void* Consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true) {
        int data = bq->pop();
        printf("消费者消费了一个数据:%d\n", data);
        sleep(2);
    }
    
}

void* Producer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    int data = 0;
    while(true) {
        data++;
        bq->push(data);
        printf("生产者生产了一个数据:%d\n", data);
    }
}

int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);
    pthread_create(&p, nullptr, Producer, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    return 0;
}

因为消费者消费的比较慢,所以消费者消费一个,生产者就生产一个

s

6.3 生产和消费Task

阻塞队列中存储的不仅仅可以是int,可以是自定义类型

// Task.hpp
#include <iostream>
#include <string>
using namespace std;

enum TaskStatus
{
    DIV_ZERO=1,
    MOD_ZERO,
    UNKOWN
};

char opStr[4] = {'+','-','*','/'};

class Task
{
public:
    Task(int a, int b, char op) : _a(a), _b(b), _op(op) {}

    void Run()
    {
        switch (_op)
        {
            case '+':
                _result = _a + _b;
                break;
            case '-':
                _result = _a - _b;
                break;
            case '*':
                _result = _a * _b;
                break;
            case '/':
                if (_b == 0)    _exitCode = DIV_ZERO;
                else    _result = _a / _b;
                break;
            case '%':
                if (_b == 0)    _exitCode = MOD_ZERO;
                else    _result = _a % _b;
                break;
            default:
                _exitCode = UNKOWN;
                break;
        }
    }

    string GetResult()
    {
        string resultStr;
        resultStr += to_string(_a);
        resultStr += _op;
        resultStr += to_string(_b);
        resultStr += '=';
        resultStr += to_string(_result);
        resultStr += "  ";
        if (_exitCode != 0)
        {
            resultStr += "error";
            resultStr += "  ";
            switch (_exitCode)
            {
                case DIV_ZERO:
                    resultStr += "div zero";
                    break;
                case MOD_ZERO:
                    resultStr += "mod zero";
                    break;
                case UNKOWN:
                    resultStr += "unkown";
                    break;
            }
        }
        else
            resultStr += "ok";
        return resultStr;
    }

    string GetTask()
    {
        string resultStr;
        resultStr += to_string(_a);
        resultStr += _op;
        resultStr += to_string(_b);
        resultStr += '=';
        resultStr += "???";
        return resultStr;
    }
    
private:
    int _a = 0;
    int _b = 0;
    char _op = ' ';
    int _exitCode = 0;
    int _result = 0;
};
// main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"

void* Consumer(void* args)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    while(true) {
        Task t = bq->pop();
        t.Run();
        printf("消费者获得数据: %s, 运算结果是: %s\n", t.GetTask().c_str(), t.GetResult().c_str());
        // sleep(1);
    }
    
}

void* Producer(void* args)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    int x = 0, y = 0;
    char op = ' ';
    while(true) {
        x = rand() % 100;     // [0, 99]
        y = rand() % 100;     // [0, 99]
        op = opStr[rand() % 4];
        Task t(x, y, op);
        usleep(10);              // 模拟获取数据需要时间
        bq->push(t);
        printf("生产者生产了一个任务:%s\n", t.GetTask().c_str());
        // sleep(1);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c, p;
    pthread_create(&p, nullptr, Producer, bq);
    pthread_create(&c, nullptr, Consumer, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    return 0;
}

image-20241004114412458

6.4 误唤醒

假设现在有一个消费者,多个生产者在它的条件变量下正在进行等待,现在消费者刚刚消费一个数据,阻塞队列于是就恰好有一个空位置,于是该消费者就去唤醒生产者,但是使用的不是pthread_cond_signal(&_pCond);而是pthread_cond_broadcast(&_pCond);

void push(const T& x)
{   
    pthread_mutex_lock(&_mutex);
    if(_q.size() == _max) {
        // 已经到了最大值,不需要再添加了,让它去等待队列中去等
        pthread_cond_wait(&_pCond, &_mutex);
    }
    _q.push(x);
	pthread_cond_signal(&_cCond);   // 生产者已经生产数据了,此时就可以唤醒消费者了。
    pthread_mutex_unlock(&_mutex);  
}

于是,这几个被唤醒的生产者就不在条件变量下等待了,而是都跑过去去竞争锁,但只有一个能竞争成功,该生产者于是就继续向下执行,向阻塞队列中push()数据。当该生产者生产完数据,准备唤醒消费者并解锁的时候,消费者不一定竞争成功锁,因为还有那些同样在刚才竞争锁资源失败的生产者在锁资源下等着呢!如果这些生产者又拿到锁,像阻塞队列中push()数据,就会发生错误,因为阻塞队列已经到_max了!

一个线程在条件满足被唤醒的时候,但是历史上的条件满足已经其它线程处理掉了,于是该线程只能等待,当它再次被唤醒,进行数据访问,就可能出错,我们把这种现象称为该线程被误唤醒了。或者叫虚假唤醒。即本不应该被唤醒线程被唤醒了,导致程序执行结果错误。


为了防止线程被虚假唤醒,判断临界资源是否可以被消费或生产的时候要用while()循环,循环判断,当条件不满足的时候,让该线程重新去等待队列中等待,而不是一股脑的一直在竞争锁资源

const T pop() 
{
    // ...
    while(_q.size() == 0) {}
    // ...
}
void push(const T& x)
{
    // ...
	while(_q.size() == _max) {}
    // ...
}

6.5 多生产者多消费者

static const size_t C_NUM = 5;
static const size_t P_NUM = 5;
// ...
int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c[C_NUM], p[P_NUM];
    for(size_t i = 0; i < P_NUM;++i)
        pthread_create(c+i, nullptr, Producer, bq);
    for(size_t i = 0; i < C_NUM;++i)
    pthread_create(p+i, nullptr, Consumer, bq);

    for(size_t i = 0; i < P_NUM;++i)
        pthread_join(c[i], nullptr);
    for(size_t i = 0; i < C_NUM;++i)
        pthread_join(p[i], nullptr);
    delete bq;
    return 0;
}

image-20241004144049161

由于BlockQueue中用了一把锁,所以生产者和生产者的互斥关系,生产者和消费者的互斥关系,消费者和消费者的互斥问题,都可以解决
生产和消费者的同步问题,我们使用了两个条件变量来解决
即便是多生产者多消费者,任何时刻,只允许一个线程来访问临界资源,让它来进行生产或消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2189732.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决TortoiseGit文件夹图标不见的问题。

打开注册表&#xff0c;\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\ShellIconOverlayIdentifiers\ &#xff0c;把里面的TortoiseGit开头的前面多补几个空格&#xff0c;让它们排到靠前的位置&#xff0c;然后重启电脑。 据说是windows只有前11/…

在线点餐堂食系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;商品管理&#xff0c;基础数据管理&#xff0c;论坛管理&#xff0c;公告信息管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;商品&#xff0c;…

AL生成文章标题指定路径保存:创新工具助力内容创作高效启航

在信息爆炸的时代&#xff0c;一个吸引人的标题是文章成功的第一步。它不仅要准确概括文章内容&#xff0c;还要能激发读者的好奇心&#xff0c;促使他们点击阅读。随着人工智能技术的飞速发展&#xff0c;AL生成文章标题功能正逐渐成为内容创作者的新宠&#xff0c;看看它是如…

Mysql数据库--聚合查询、分组查询、联合查询(不同的连接方式)

文章目录 1.查询的进阶版1.1查询搭配插入进行使用1.2聚合查询1.3group by分组查询1.4联合查询之笛卡尔积1.5左外连接&#xff0c;右外连接介绍join on1.6自连表 1.查询的进阶版 1.1查询搭配插入进行使用 我们首先创建两张表&#xff0c;一个叫做student,一个叫做student2,两个…

DenseNet算法:口腔癌识别

本文为为&#x1f517;365天深度学习训练营内部文章 原作者&#xff1a;K同学啊 一 DenseNet算法结构 其基本思路与ResNet一致&#xff0c;但是它建立的是前面所有层和后面层的密集连接&#xff0c;它的另一大特色是通过特征在channel上的连接来实现特征重用。 二 设计理念 三…

遥感影像-语义分割数据集:云及云阴影数据集详细介绍及训练样本处理流程

原始数据集详情 简介&#xff1a;数据集包括108个GF-1宽幅&#xff08;WFV&#xff09;的云和云阴影掩码&#xff0c;该数据集用于GF-1 WFV图像中的云和云阴影检测。 KeyValue卫星类型高分一宽幅覆盖区域未知场景未知分辨率16m数量108张单张尺寸17344*15627原始影像位深16位标…

如何在银河麒麟服务器中获取关键日志信息

如何在银河麒麟服务器中获取关键日志信息 1、获取messages日志2、获取dmesg输出 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在银河麒麟服务器中&#xff0c;获取messages和dmesg日志是排查问题的重要步骤。 1、dmesg命令用于显示或控制…

【深度学习基础模型】深度残差网络(Deep Residual Networks, DRN)详细理解并附实现代码。

【深度学习基础模型】深度残差网络&#xff08;Deep Residual Networks, DRN&#xff09;详细理解并附实现代码。 【深度学习基础模型】深度残差网络&#xff08;Deep Residual Networks, DRN&#xff09;详细理解并附实现代码。 文章目录 【深度学习基础模型】深度残差网络&a…

C++ | Leetcode C++题解之第457题环形数组是否存在循环

题目&#xff1a; 题解&#xff1a; class Solution { public:bool circularArrayLoop(vector<int>& nums) {int n nums.size();auto next [&](int cur) {return ((cur nums[cur]) % n n) % n; // 保证返回值在 [0,n) 中};for (int i 0; i < n; i) {if …

【人工智能深度学习应用】妙搜API最佳实践

功能概述 AI妙搜通过集成夸克通用搜索引擎&#xff0c;能够提供一个强大的搜索素材功能&#xff0c;大大提升内容创作者在寻找和使用网络资源时的效率和便捷性。用户只需输入相关的关键词或描述&#xff0c;系统将根据用户的搜索词在互联网上进行搜索&#xff0c;并展示与搜索…

【3D目标检测】激光雷达和相机联合标定(一)——ROS同步解包

ROS同步解包 引言1 鱼香ROS一键安装ros-docker脚本&#xff1a;2 指定目录映射3 数据解包3.1 解包脚本3.2 依赖安装3.3 运行脚本&#xff0c;解包 引言 总结步骤如下&#xff1a; 采集同步数据&#xff1a;ROS录制&#xff08;推荐&#xff09;&#xff0c;或者代码同步触发采…

C++入门基础知识99——【关于C++ 成员运算符】

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C 成员运算符的相关内容&#xff01; 关…

昇思学习打卡营第32天|基于ResNet50的中药炮制饮片质量判断模型

背景介绍 中药炮制是根据中医药理论&#xff0c;依照临床用药需求&#xff0c;通过调剂和制剂要求&#xff0c;将中药材制备成中药饮片的过程。老百姓日常使用的中药饮片&#xff0c;是中药炮制技术的成果。中药炮制过程中&#xff0c;尤其是涉及到水火处理时&#xff0c;必须注…

CNN模型对CIFAR-10中的图像进行分类

代码功能 这段代码展示了如何使用 Keras 和 TensorFlow 构建一个卷积神经网络&#xff08;CNN&#xff09;模型&#xff0c;用于对 CIFAR-10 数据集中的图像进行分类。主要功能包括&#xff1a; 加载数据&#xff1a;从 CIFAR-10 数据集加载训练和测试图像。 数据预处理&#…

HTTP【网络】

文章目录 HTTPURL(Uniform Resource Lacator) HTTP协议格式HTTP的方法HTTP的状态码HTTP常见的Header HTTP 超文本传输协议&#xff0c;是一个简单的请求-响应协议&#xff0c;HTTP通常运行在TCP之上 URL(Uniform Resource Lacator) 一资源定位符&#xff0c;也就是通常所说的…

NIM简单实践-图像分割

项目背景 我正在学习一个图像分割的 Demo&#xff0c;使用 NVIDIA 提供的预训练大模型进行光学字符检测 (OCDNet) 和光学字符识别 (OCRNet)。这些模型专门为光学字符检测和识别设计&#xff0c;能够自动将图像中的字符进行分割和识别。 预训练模型介绍 OCDNet (Optical Char…

Windows NTLM中继攻击(PortBender二进制可执行文件)

Windows NTLM中继攻击&#xff08;PortBender二进制可执行文件) 前言 最近在完善自己的一套TTPs&#xff08;战术、技术和程序&#xff09;以应对未来的网络作战、项目和攻防演练需求&#xff0c;翻到了PortBender&#xff0c;我觉得不依赖C2和影响主机本身实现这一切非常有趣…

如何使用ssm实现民族大学创新学分管理系统分析与设计+vue

TOC ssm763民族大学创新学分管理系统分析与设计vue 第1章 绪论 1.1 课题背景 二十一世纪互联网的出现&#xff0c;改变了几千年以来人们的生活&#xff0c;不仅仅是生活物资的丰富&#xff0c;还有精神层次的丰富。在互联网诞生之前&#xff0c;地域位置往往是人们思想上不…

Linux 生产者消费者模型

前言 生产者消费者模型&#xff08;CP模型&#xff09;是一种十分经典的设计&#xff0c;常常用于多执行流的并发问题中&#xff01;很多书上都说他很高效&#xff0c;但高效体现在哪里并没有说明&#xff01;本博客将详解&#xff01; 目录 前言 一、生产者消费者模型 1.…

绝美的登录界面!滑动切换效果

绝美登录界面&#xff01;添加了管理员账号和测试账号 <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><scri…