【Linux】生产者消费者模型 - 详解

news2025/1/12 16:12:48

目录

一.生产者消费者模型概念

1.为何要使用生产者消费者模型

2.生产者消费者之间的关系

3.生产者消费者模型的优点

二.基于阻塞队列的生产消费模型

1.在阻塞队列中的三种关系

2.BlockingQueue.hpp - 阻塞队列类

3.LockGurad.hpp - RAII互斥锁类

4.Task.hpp - 在阻塞队列中生产/消费的任务类

5.CPTest.cc - 测试

三.基于环形队列的生产消费模型 

1.在环形队列中的三种关系

2.环形队列vs阻塞队列

3.CircularQueue.hpp - 环形队列类

4.LockGuard.hpp - RAII互斥锁类

5.Task.cpp - 任务类

6.CPTest.cc - 测试

四. 写在最后

1.关于条件变量的伪唤醒问题

2.关于信号量和锁的先后顺序问题 


一.生产者消费者模型概念

1.为何要使用生产者消费者模型

生产者消费者模式就是通过一个"容器"来解决生产者与消费者之间的强耦合问题, 生产者和消费者彼此之间不直接通讯, 而是通过特定"容器"来进行通讯, 所以生产者生产完数据之后不需要等待消费者处理, 直接扔给"容器", 消费者不直接找生产者索要数据, 而是从"容器"中拿数据, "容器"其实就是由特定的数据结构编写的缓冲区, 平衡了生产者和消费者的处理能力, 通过生产消费之间的"容器"达到解耦的目的

2.生产者消费者之间的关系

一个场所, 也就是上述"容器"

二个角色, 生产者与消费者

三种关系

        生产者 - 生产者: 互斥关系

        消费者 - 消费者: 互斥关系

        生产者 - 消费者: 互斥关系, 同步关系

生产者消费者模型分为两种

1.单生产单消费

2.多生产多消费

3.生产者消费者模型的优点

1.将生产者消费者的强耦合关系解耦

2.支持并发

二.基于阻塞队列的生产消费模型

1.在阻塞队列中的三种关系

生产者与生产者之间互斥

消费者与消费者之间互斥

生产者与消费者之间既互斥又同步

同步关系使用条件变量

2.BlockingQueue.hpp - 阻塞队列类

#pragma once

#include <iostream>
#include <queue>
#include "LockGurad.hpp"

template <class T>
class BlockingQueue
{
public:
    static BlockingQueue<T> *GetInst()
    {
        return &_sInst;
    }

public:
    void push(const T& in)
    {
        LockGurad lock(&_mutex); // RAII锁
        // 如果阻塞队列满了就阻塞
        while(isFull())
        {
            pthread_cond_wait(&_isFullCond, &_mutex); // 已满, 阻塞
        }
        _bq.push(in);
        pthread_cond_signal(&_isEmptyCond); // 已经添加了一个, 唤醒已空条件阻塞
    }

    void pop(T* out)
    {
        LockGurad lock(&_mutex); // RAII锁
        // 如果阻塞队列空了就阻塞
        while(isEmpty())
        {
            pthread_cond_wait(&_isEmptyCond, &_mutex); // 已空, 阻塞
        }
        *out = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_isFullCond); // 已经取走了一个, 唤醒已满条件阻塞
    }

    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_isFullCond);
        pthread_cond_destroy(&_isEmptyCond);
    }

private:
    bool isFull() // 判满
    {
        return _bq.size() == _capacity;
    }

    bool isEmpty() // 判空
    {
        return _bq.size() == 0;
    }

private:
    std::queue<T> _bq;          // 阻塞队列
    size_t _capacity;           // 阻塞队列最大容量
    pthread_mutex_t _mutex;      // 保护阻塞队列的锁
    pthread_cond_t _isFullCond;  // 判断是否为满
    pthread_cond_t _isEmptyCond; // 判断是否为空

private:
    // 单例模式
    BlockingQueue(size_t capacity = 5) : _capacity(capacity)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_isFullCond, nullptr);
        pthread_cond_init(&_isEmptyCond, nullptr);
    }
    BlockingQueue(const BlockingQueue<T> &copy) = delete;
    BlockingQueue<T> &operator=(const BlockingQueue<T> &copy) = delete;

    static BlockingQueue<T> _sInst;
};

template <class T>
BlockingQueue<T> BlockingQueue<T>::_sInst;

3.LockGurad.hpp - RAII互斥锁类

#pragma once

#include <pthread.h>

class LockGurad
{
public:
    LockGurad(pthread_mutex_t* mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }

    ~LockGurad()
    {
        pthread_mutex_unlock(_mutex);
    }

private:
    pthread_mutex_t* _mutex;
};

4.Task.hpp - 在阻塞队列中生产/消费的任务类

#pragma once

typedef int(* func_t)(int, int);

struct Task
{
    Task(){};
    Task(int x, int y, func_t func):_x(x),_y(y),_func(func)
    {}

    int operator()()
    {
        return _func(_x, _y);
    }

    int _x;
    int _y;

    func_t _func;
};

5.CPTest.cc - 测试

#include "BlockingQueue.hpp"
#include "Task.hpp"
#include "time.h"
#include "stdlib.h"
#include "sys/types.h"
#include "unistd.h"

int myAdd(int x, int y)
{
    return x + y;
}

void *Consumer(void *args)
{
    BlockingQueue<Task> *bqsInst = (BlockingQueue<Task> *)args;

    while (1)
    {
        // 消费任务
        Task t;
        bqsInst->pop(&t);

        std::cout << pthread_self() << " consumer: " << t._x << "+" << t._y << "=" << t() << std::endl;
        sleep(1);
    }

    return nullptr;
}

void *Provider(void *args)
{
    BlockingQueue<Task> *bqsInst = (BlockingQueue<Task> *)args;

    while (1)
    {
        // 制作任务
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        Task t(x, y, myAdd);

        // 添加任务
        bqsInst->push(t);

        std::cout << pthread_self() << " productor: " << t._x << "+" << t._y << "=?" << std::endl;
        sleep(1);
    }

    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);

    // 获取单例指针
    BlockingQueue<Task> *bqsInst = BlockingQueue<Task>::GetInst();

    // 单生产单消费
    //pthread_t c, p;
    //pthread_create(&c, nullptr, Consumer, bqsInst);
    //pthread_create(&p, nullptr, Provider, bqsInst);

    // 多生产多消费
    pthread_t c[2], p[3];
    pthread_create(c, nullptr, Consumer, bqsInst);
    pthread_create(c + 1, nullptr, Consumer, bqsInst); 

    pthread_create(p, nullptr, Provider, bqsInst);
    pthread_create(p + 1, nullptr, Provider, bqsInst);
    pthread_create(p + 2, nullptr, Provider, bqsInst);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);

    return 0;
}

三.基于环形队列的生产消费模型 

数据结构: 环形队列, 传送入口: http://t.csdn.cn/p1OXv

1.在环形队列中的三种关系

生产者与生产者之间互斥

消费者与消费者之间互斥

生产者与消费者之间同步

同步关系使用信号量

生产消费不需要互斥, 即分别用两把锁

2.环形队列vs阻塞队列

环形队列对比阻塞队列的优势: 生产与消费可以同时进行, 这是因为环形数据结构的特性, 以及信号量sem共同支持的

为什么?

对于阻塞队列而言

当阻塞队列中还存在有数据时, 生产者与消费者必然不会访问到同一个数据, 但是当队列为空, 如果不将生产与消费互斥的话, 就很可能在生产的同时又在消费, 消费的可能是正在生产中的数据, 就可能发生问题

对于环形队列而言

生产者与消费者的下标, 只有在环形队列满或空时指向同一位置, 通过信号量进行同步之后, 不可能会发生同时在同一位置生产消费的情况

生产消费指向同一位置, 只有两种情况

1.环形队列为满, 此时该位置是还存有未消费的数据, 没有多余的生产信号量, 该位置只可能存在消费行为

2.环形队列为空, 此时该位置不存有任何数据, 没有多余的消费信号量, 该位置只可能存在生产行为

3.CircularQueue.hpp - 环形队列类

#include <vector>
#include <semaphore.h>
#include "LockGurad.hpp"

template <class T>
class CircularQueue
{
public:
    CircularQueue(size_t capacity) : _capacity(capacity), _cStep(0), _pStep(0)
    {
        _v.resize(capacity);

        sem_init(&_spaceSem, 0, capacity);
        sem_init(&_dataSem, 0, 0);

        pthread_mutex_init(&_cMtx, nullptr);
        pthread_mutex_init(&_pMtx, nullptr);
    }

    void push(const T &in)
    {
        // 这里把信号量加在锁前面, 进一步提高效率, 原理类似于买票看电影, 提前买票在排队检票效率更高
        sem_wait(&_spaceSem);
        {
            LockGurad lock(&_pMtx); // RAII加锁风格
            _v[_pStep++] = in;
            _pStep %= _capacity;
        }
        sem_post(&_dataSem);
    }

    void pop(T *out)
    {
        sem_wait(&_dataSem);
        {
            LockGurad lock(&_cMtx);
            *out = _v[_cStep++];
            _cStep %= _capacity;
        }
        sem_post(&_spaceSem);
    }

    ~CircularQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);

        pthread_mutex_destroy(&_cMtx);
        pthread_mutex_destroy(&_pMtx);
    }

private:
    std::vector<T> _v;     // 环形队列
    size_t _capacity; // 最大容量

    int _cStep; // 消费下标
    int _pStep; // 生产下标

    sem_t _spaceSem; // 空间资源信号量
    sem_t _dataSem;  // 数据资源信号量

    pthread_mutex_t _cMtx; // 生产锁
    pthread_mutex_t _pMtx; // 消费锁
};

4.LockGuard.hpp - RAII互斥锁类

#pragma once

#include <pthread.h>

class LockGurad
{
public:
    LockGurad(pthread_mutex_t* mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }

    ~LockGurad()
    {
        pthread_mutex_unlock(_mutex);
    }

private:
    pthread_mutex_t* _mutex;
};

5.Task.cpp - 任务类

#pragma once

typedef int(* func_t)(int, int);

struct Task
{
    Task(){};
    Task(int x, int y, func_t func):_x(x),_y(y),_func(func)
    {}

    int operator()()
    {
        return _func(_x, _y);
    }

    int _x;
    int _y;

    func_t _func;
};

6.CPTest.cc - 测试

#include <iostream>
#include "CircularQueue.hpp"
#include "Task.hpp"
#include <time.h>
#include <unistd.h>
#include <stdlib.h>

int myAdd(int x, int y)
{
    return x + y;
}

void* Consumer(void* args)
{
    CircularQueue<Task>* cq = (CircularQueue<Task>*)args;

    while(1)
    {
        // 消费任务
        Task t;
        cq->pop(&t);

        std::cout << "消费: " << t._x << "+" << t._y << "=" << t() << std::endl;
    }
}

void* Provider(void* args)
{
    CircularQueue<Task>* cq = (CircularQueue<Task>*)args;

    while(1)
    {
        // 制作任务
        int x = rand() % 50 + 1;
        int y = rand() % 50 + 1;
        Task t(x, y, myAdd);

        // 添加任务
        cq->push(t);

        std::cout << "生产: " << t._x << "+" << t._y << "=?" << std::endl;
        sleep(1);
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ 0x123);

    CircularQueue<Task>* cq = new CircularQueue<Task>(5);

    pthread_t c[2], p[3];
    pthread_create(c, nullptr, Consumer, cq);
    pthread_create(c+1, nullptr, Consumer, cq);

    pthread_create(p, nullptr, Provider, cq);
    pthread_create(p+1, nullptr, Provider, cq);
    pthread_create(p+2, nullptr, Provider, cq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);

    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);

    return 0;
}

四. 写在最后

1.关于条件变量的伪唤醒问题

为什么不用if, 而使用while, 原因就是如果此时只有1个资源, 而刚刚有很多个线程在挂起等待, 为了这1个资源全部被唤醒了, 那么就一定只有一个线程真正的拿到资源, 其他线程都是伪唤醒, 避免这种问题的方法就是用while判断, 当线程被唤醒时再循环判断一次, 如果为假就会跳出循环, 如果为真说明是伪唤醒, 则继续挂起等待

2.关于信号量和锁的先后顺序问题 

值得一提的是, 环形队列中生产者消费者的同步关系由信号量来实现, 并且生产者与消费者之间没有互斥关系, 那么生产者与消费者分别使用两把锁维护其各自的互斥关系, 那么有一个值得思考的问题就是信号量和锁谁在前谁在后问题

 答案是: 信号量先申请比较好. 信号量确实访问了临界资源, 但其内部实现也是原子性的, 而且信号量类似于预定机制, 只是提前把资源预定下来, 想要访问资源, 只需要等待取走锁资源即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/343013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TS 函数重载你还不会?来!我教你

前言&#xff1a; 今天在项目中遇到了后端接口参数类型和接口返回值需要修改的场景&#xff0c;由于这个函数在很多页面都用到了&#xff0c;就导致改完相关 api 函数的时候 TS 疯狂报错&#xff0c;所有的参数和返回值都需要跟着改&#xff0c;一时间头疼。正当我手足无措的时…

为什么选择pnpm

npm V3版本之前 问题1 会将模块对应的依赖装到当前模块下,那么如果有嵌套的话,依赖树就会比较深.这中长路径会导致在windows下是找不到的.有一句玩笑话是宇宙最深的不是黑洞而是nodemodules. 问题2 相同的包如果被不同的包依赖就会下载多个副本,造成磁盘空间的浪费 npm V…

知不知道什么叫米筐量化?怎么来的?

现在量化市场范围越来越大&#xff0c;各种量化系统也是普遍性的了&#xff0c;不过米匡量化这个开发系统通常是由交易接口的专业开发团队开发的的结果&#xff0c;那么米匡量化的终端又是是怎么开发成功的呢&#xff1f;首先&#xff0c;我们可以从api接口的调用来了解&#x…

高通开发系列 - 总目录

By: fulinux E-mail: fulinuxsina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅&#xff01; 你的喜欢就是我写作的动力&#xff01; 目录序言序言 大家好&#xff0c;欢迎进入《高通开发系列》专栏&#xff0c;小编有10余年嵌入式开发经验&#xff0c;…

77页智慧城市顶层设计方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a;篇幅有限&#xff0c;无法完全展…

天猫商城自动化python脚本(仅供初学者学习使用)

作者&#xff1a;Eason_LYC 悲观者预言失败&#xff0c;十言九中。 乐观者创造奇迹&#xff0c;一次即可。 一个人的价值&#xff0c;在于他所拥有的。可以不学无术&#xff0c;但不能一无所有&#xff01; 技术领域&#xff1a;WEB安全、网络攻防 关注WEB安全、网络攻防。我的…

行人检测(人体检测)3:Android实现人体检测(含源码,可实时人体检测)

行人检测(人体检测)3&#xff1a;Android实现人体检测(含源码&#xff0c;可实时人体检测) 目录 行人检测(人体检测)3&#xff1a;Android实现人体检测(含源码&#xff0c;可实时人体检测) 1. 前言 2. 人体检测数据集说明 3. 基于YOLOv5的人体检测模型训练 4.人体检测模型…

开源工具系列4:Nuclei

前言 Nuclei 用于基于模板跨目标发送请求&#xff0c;从而实现零误报并提供对大量主机的快速扫描。Nuclei 提供对各种协议的扫描&#xff0c;包括 TCP、DNS、HTTP、SSL、File、Whois、Websocket、Headless 等。凭借强大而灵活的模板&#xff0c;Nuclei 可用于对各种安全检查进…

ArcGIS中ArcMap创建渔网Create Fishnet:生成指定大小的格网矢量文件

本文介绍在ArcMap软件中&#xff0c;通过“Create Fishnet”工具创建渔网&#xff0c;从而获得指定大小的矢量格网数据的方法。 首先&#xff0c;我们在创建渔网前&#xff0c;需要指定渔网覆盖的范围。这里我们就以四川省为例&#xff0c;在这一范围内创建渔网&#xff1b;其中…

2.13作业【设备树解析,按自己理解】

设备树定义 设备树&#xff08;device tree是描述硬件信息的一种树形结构&#xff0c;设备书文件在linux内核启动后被内核解析。描述一个硬件设备信息的节点我们叫做设备节点&#xff0c;一个设备节点内部包含当前硬件的多个不同属性&#xff0c;相同节点不同属性是以链式结构存…

[Datawhale][CS224W]图神经网络(一)

目录一、导读1.1 当前图神经网络的难点1.2 图神经网络应用场景及对应的相关模型&#xff1a;1.3 图神经网络的应用方向及应用场景二、图机器学习、图神经网络编程工具参考文献一、导读 ​ 传统深度学习技术&#xff0c;如循环神经网络和卷积神经网络已经在图像等欧式数据和信号…

RoI Transformer论文翻译详解

Learning RoI Transformer for Oriented Object Detection in Aerial Images 0.摘要 航空图像中的目标检测是计算机视觉中一个活跃而又具有挑战性的任务&#xff0c;因为它具有鸟瞰视角、高度复杂的背景和变化的物体外观。特别是在航空图像中检测密集的目标时&#xff0c;基于…

01-RTOS

对于裸机而言&#xff0c;对于RTOS而言即&#xff1a;对于裸机&#xff0c;打游戏意味着不能回消息 回消息意味着不能打游戏对于RTOS 打游戏和裸机的切换只需要一个时间片节拍 1ms 从宏观来看 就是同时进行的两件事&#xff08;但要在这两件事情的优先级一样的情况下&#xff0…

HTML面试题

HTML面试题部分知识点梳理 1.如何理解HTML语义化 让页面的内容结构化&#xff0c;便于对浏览器、引擎解析&#xff0c;易于阅读&#xff0c;便于维护理解&#xff0c;利于SEO。 2.H5的新特性 video/audio视频/音频canvas 绘画geolocation 定位 用于定位用户的位置WebSocket…

浅谈业务中台前端设计

做前端中台业务一年多的时间&#xff0c;有一些心得体会&#xff0c;和大家分享分享。 中台是什么中台业务的价值是什么做了哪些前端中台业务如何设计前端中台业务未来展望 中台是什么 百度百科的解释比较言简意赅&#xff1a;“中台&#xff0c;互联网术语&#xff0c;一般…

[数据库]表的约束

●&#x1f9d1;个人主页:你帅你先说. ●&#x1f4c3;欢迎点赞&#x1f44d;关注&#x1f4a1;收藏&#x1f496; ●&#x1f4d6;既选择了远方&#xff0c;便只顾风雨兼程。 ●&#x1f91f;欢迎大家有问题随时私信我&#xff01; ●&#x1f9d0;版权&#xff1a;本文由[你帅…

【MySQL】MySQL 中 WITH 子句详解:从基础到实战示例

文章目录一、什么是 WITH 子句1. 定义2.用途二、WITH 子句的语法和用法1.语法2.使用示例3.优点三、总结"梦想不会碎&#xff0c;只有被放弃了才会破灭。" "Dreams wont break, only abandoned will shatter."一、什么是 WITH 子句 1. 定义 WITH 子句是 M…

LeetCode——1234. 替换子串得到平衡字符串

一、题目 有一个只含有 ‘Q’, ‘W’, ‘E’, ‘R’ 四种字符&#xff0c;且长度为 n 的字符串。 假如在该字符串中&#xff0c;这四个字符都恰好出现 n/4 次&#xff0c;那么它就是一个「平衡字符串」。 给你一个这样的字符串 s&#xff0c;请通过「替换一个子串」的方式&a…

Kubeadm搭建K8S

目录 一、部署步骤 1、实验环境 2、环境准备 3、所有节点安装Docker 4、 所有节点配置K8S源 5、所有节点安装kubeadm&#xff0c;kubelet和kubectl 6、部署 kubernetes Master 节点 7、token制作 8、k8s-node节点加入master节点 9、 master节点安装部署pod网络插件&a…

ChatGPT被玩疯,问“如果美国倒了,世界会怎样?”回答太吓人了

“ChatGPT”大火不见消停…… 最近这些天&#xff0c;想必大家的社交平台都刷爆了“ChatGPT”这个词吧&#xff1f; 作为OpenAI 的语言模型工具&#xff0c;它可以生成文本、回答问题、对话、摘要、翻译等。 特点是语言表达流畅&#xff0c;思维敏捷&#xff0c;可以回答复杂…