linux:生产者消费者模型

news2025/1/11 16:54:33

在这里插入图片描述

个人主页 : 个人主页
个人专栏 : 《数据结构》 《C语言》《C++》《Linux》

文章目录

  • 前言
  • 一、生产者消费者模型
  • 二、基于阻塞队列的生产者消费者模型
    • 代码实现
  • 总结


前言

本文是对于生产者消费者模型的知识总结


一、生产者消费者模型

生产者消费者模型就是通过一个容器来解决生产者消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接交给容器,消费者不找生产者要数据,而是直接从容器中取数据,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,其中这个容器就是用于生产者和消费者之间解耦的
(强耦合是指两个或多个系统,组件或模块之间存在紧密依赖关系。)

在这里插入图片描述


特点:

  1. 三种关系:生产者与生产者之间(互斥),消费者与消费者之间(互斥),生产者与消费者之间(互斥 && 同步)
  2. 两种角色:生产者和消费者
  3. 一个交易(通讯)场所:一个容器(一段内存空间)

因为我们是多个线程访问同一个容器,那必然会导致数据不一致的问题,所以我们需要对该临界资源加锁,所以生产者与生产者之间,消费者与消费者之间,生产者与消费者之间都是互斥的。
又因为容器可能为空(满),此时消费者(生产者)还一直在临界区申请锁,又因没有数据(空间)而释放锁,从而不断申请锁释放锁,导致生产者(消费者)的饥饿问题。此时我们就需要生产者与消费者之间的同步。

对于2,3两点,这很好理解不解释。
我们编写生产者消费者模型的本质就是对以上三点的维护。(互斥保证数据安全,同步保证效率)


优点:

  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

对于第一点,不就是生产者与消费者通过容器来解耦提升效率(如果没有这个容器,则生产者生产完数据,就必须等待消费者来接受处理数据,不能立刻继续生产数据)。
对于第二点,当生产者在生产数据时,消费者也同时在处理数据
对于第三点,当生产者生产数据的速度超过消费者的处理能力时,容器可以起到缓存的作用,将多余的数据暂时存储,等待消费者有空闲时再进行处理。如果消费者处理数据的能力超过生产者时,同理。

二、基于阻塞队列的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

  • 队列为空时,从队列中取数据将被阻塞,直到队列中有数据时被唤醒。
  • 队列为满时,向队列中放入数据将被阻塞,直到队列中有数据取出被唤醒。

代码实现

下面是一个单生成单消费模型
在这里插入图片描述
LockGuard.hpp 文件 将加锁释放锁,交给一个对象处理,当对象创建加锁,对象销毁释放锁

#pragma once
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mutex):_mutex(mutex)
    {}

    void Lock()
    {
        pthread_mutex_lock(_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(_mutex);
    }

    ~Mutex()
    {}
private:
    pthread_mutex_t *_mutex;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex): _lock(mutex)
    {
        _lock.Lock();
    }

    ~LockGuard()
    {
        _lock.UnLock();
    }
private:
    Mutex _lock;
};

Blockqueue.hpp 文件

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"

using namespace std;
const int CAPACITY = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap = CAPACITY):_capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p, nullptr);
        pthread_cond_init(&_c, nullptr);
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

    void Push(const T &in)
    {
        LockGuard mutex(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(isFull())
        {
            pthread_cond_wait(&_p, &_mutex);
        }

        _bq.push(in);
        // 唤醒策略为 生产一个,消费一个
        pthread_cond_signal(&_c);

        //pthread_mutex_unlock(&_mutex);
    }

    bool isEmpty()
    {
        return _bq.size() == 0;
    }

    void Pop(T *out)
    {
        LockGuard mutex(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(isEmpty())
        {
            pthread_cond_wait(&_c, &_mutex);
        }

        *out = _bq.front();
        _bq.pop();
        // 唤醒策略为 消费一个,生产一个
        pthread_cond_signal(&_p);

        //pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p);
        pthread_cond_destroy(&_c);
    }
private:
    queue<T> _bq;
    int _capacity;

    pthread_mutex_t _mutex;
    pthread_cond_t _p;
    pthread_cond_t _c;
};

Task.hpp 文件

#pragma once
#include <string>

const char *opers = "+-*/%";

enum
{
    ok = 0,
    div_zero,
    mod_zero
};

class Task
{
public:
    Task()
    {}

    Task(int x, int y, char op) : _data_x(x), _data_y(y), _oper(op)
    {
        _code = ok;
    }

    void Run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data_x + _data_y;
            break;
        case '-':
            _result = _data_x - _data_y;
            break;
        case '*':
            _result = _data_x * _data_y;
            break;
        case '/':
            {
                if(_data_y == 0)
                {
                    _code = div_zero;
                }
                else
                {
                    _result = _data_x / _data_y;
                }
            }
            break;
        case '%':
            {
                if(_data_y == 0)
                {
                    _code = mod_zero;
                }
                else
                {
                    _result = _data_x % _data_y;
                }
            }
            break;
        default:
            break;
        }
    }

    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string ret = std::to_string(_data_x);
        ret += _oper;
        ret += std::to_string(_data_y);

        ret += "=?";
        return ret;
    }

    std::string PrintResult()
    {
        std::string ret = std::to_string(_data_x);
        ret += _oper;
        ret += std::to_string(_data_y);

        ret += "=";
        if(_code == ok)
        {
            ret += std::to_string(_result);
        }
        else
        {
            ret += "?";
        }

        ret += "[";
        ret += std::to_string(_code);
        ret += "]";

        return ret;
    }

    ~Task()
    {}

private:
    int _data_x;
    int _data_y;
    char _oper;

    int _result;
    int _code; // 错误码
};

Main.cc 文件

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;


void *producer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 产生任务
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        char oper = opers[rand() % strlen(opers)];

        Task task(x, y, oper);
        cout << "producer: " << task.PrintTask() << endl;
        bq->Push(task);
        sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
        if (bq->isFull())
        {
            Task task;
            bq->Pop(&task);

            task();
            cout << "consumer: " << task.PrintResult() << endl;
            //sleep(1);
        }
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p;
    pthread_create(&p, nullptr, producer, (void *)&bq);

    pthread_t c;
    pthread_create(&c, nullptr, consumer, (void *)&bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

在这里插入图片描述
在这里插入图片描述

那如何将这个单生产单消费该为多生产多消费呢?因为多生产多消费本质也是多个线程访问临界资源,那我们单生产和单消费不也是多个线程访问临界资源吗,所以我们不需要对BlockQueue.hpp文件进行修改,只需要在main函数中,创建多个生产者和消费者即可。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;

template <class T>
class ThreadData
{
public:
    ThreadData(pthread_t tid, const string threadname, BlockQueue<T> *bq)
        : _tid(tid), _threadname(threadname), _bq(bq)
    {}

public:
    pthread_t _tid;
    string _threadname;
    BlockQueue<T>* _bq;
};

void *producer(void *args)
{
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 产生任务
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        char oper = opers[rand() % strlen(opers)];

        Task task(x, y, oper);
        cout << data->_tid << ", " << data->_threadname <<": " << task.PrintTask() << endl;
        data->_bq->Push(task);
        sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
        if (data->_bq->isFull())
        {
            Task task;
            data->_bq->Pop(&task);

            task();
            cout << data->_tid << ", " << data->_threadname << ": " << task.PrintResult() << endl;
            // sleep(1);
        }
    }
}



int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p1;
    ThreadData<Task> data1(p1, "product-1", &bq);
    pthread_create(&p1, nullptr, producer, (void *)&data1);

    pthread_t p2;
    ThreadData<Task> data2(p2, "product-2", &bq);
    pthread_create(&p2, nullptr, producer, (void *)&data2);

    pthread_t c1;
    ThreadData<Task> data3(c1, "consumer-1", &bq);
    pthread_create(&c1, nullptr, consumer, (void *)&data3);
    pthread_t c2;
    ThreadData<Task> data4(c2, "consumer-2", &bq);
    pthread_create(&c2, nullptr, consumer, (void *)&data4);

    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);

    return 0;
}

在这里插入图片描述
在这里插入图片描述


总结

以上就是我对于线程同步的总结。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1561318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java封装最佳实践:打造高内聚、低耦合的优雅代码~

​ 个人主页&#xff1a;秋风起&#xff0c;再归来~ 文章专栏&#xff1a;javaSE的修炼之路 个人格言&#xff1a;悟已往之不谏&#xff0c;知来者犹可追 克心守己&#xff0c;律己则安&#xff01; 1、封装 1.1 封装的概念 面向对象程序三大…

数据类型和变量的深入理解

引言&#xff1a;C语言数据类型的意义&#xff0c;数据在内存中的存储情况&#xff0c;变量的声明与定义的区别&#xff0c;和一些关键字。 目录 1.变量的定义与声明 1.1定义与声明 1.2 变量的初始化与赋值 2.C语言常见的数据类型 3.变量的作用域与生命周期 4.signed 和 un…

SpringBoot mybatis-starter解析

mybatis-starter使用指南 自动检测工程中的DataSource创建并注册SqlSessionFactory实例创建并注册SqlSessionTemplate实例自动扫描mappers mybatis-starter原理解析 注解类引入原理 查看对应的autoconfigure包 MybatisLanguageDriverAutoConfiguration 主要是协助使用注解来…

Leetcode 4.1

LeetCode 热题 100 贪心算法1.买卖股票的最佳时机2.跳跃游戏3.跳跃游戏 II4.划分字母区间 区间合并1.合并区间 贪心算法 1.买卖股票的最佳时机 买卖股票的最佳时机 买的那天一定是卖的那天之前的最小值。 每到一天&#xff0c;维护那天之前的最小值即可。 在题目中&#xff0…

红米手机Redmi 不会自动弹出USB调试选项,如何处理?(红米小米均适用)

参考&#xff1a; 红米手机Redmi 不会自动弹出USB调试选项&#xff0c;如何处理&#xff1f;&#xff08;红米小米均适用&#xff09; - 知乎 以红米9A为例&#xff1b; 【设置】菜单进入后&#xff0c;找到【我的设备】&#xff0c; 选择【全部参数】&#xff0c; 对准miui版…

npm ERR! code CERT_HAS_EXPIRED 淘宝镜像失效

近期vue安装失败&#xff0c;具体如下&#xff1a; 1.先npm cache clean --force 再下载 插件后缀加上 --legacy-peer-deps 2.certificate has expired npm ERR! code CERT_HAS_EXPIRED npm ERR! errno CERT_HAS_EXPIRED npm ERR! request to https://registry.npm.taobao.o…

【面试题】RocketMQ怎么处理消息积压?

如图,消息积压主要是因为&#xff0c;消费能力不足&#xff1a; 在RocketMQ中&#xff0c;处理消息积压的方法可以采取以下几种策略&#xff1a; 增加消费者数量&#xff1a;可以通过增加消费者数量来提高消息的消费速度。通过增加消费者实例或者消费者组的数量&#xff0c;可…

【Consul】Linux安装Consul保姆级教程

【Consul】Linux安装Consul保姆级教程 大家好 我是寸铁&#x1f44a; 总结了一篇【Consul】Linux安装Consul保姆级教程✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 前言 今天要把编写的go程序放到linux上进行测试Consul服务注册与发现&#xff0c;那怎么样才能实现这一过程&am…

Layui三级联动插件使用方法

Layui高版本中没有在提供三级联动这个动画了&#xff0c;而是封装成了一个插件&#xff0c;使用方式也很简单 官网 省市县区三级联动下拉选择器 layarea - Layui 第三方扩展组件平台 (layuion.com)https://dev.layuion.com/extend/layarea/#doc html页面约束 整个选择器需要…

【二叉树】Leetcode 101. 对称二叉树【简单】

对称二叉树 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 示例1&#xff1a; 输入&#xff1a;root [1,2,2,3,4,4,3] 输出&#xff1a;true 解题思路 判断一棵二叉树是否是轴对称的&#xff0c;可以通过递归方式进行判断。 1、定义一个递归函数isMirr…

【C++】C++入门第二课(函数重载 | 引用 | 内联函数 | auto关键字 | 指针空值nullptr)

目录 前言 函数重载 概念 重载函数的条件 C支持重载函数的原理--名字修饰 引用 概念 特性 常引用&#xff08;const引用&#xff09; 使用场景 传值&#xff0c;传引用效率比较 引用和指针的区别 内联函数 概念 特性 auto关键字&#xff08;C11&#xff09; a…

javaWeb项目-家政服务管理系统功能介绍

项目关键技术 开发工具&#xff1a;IDEA 、Eclipse 编程语言: Java 数据库: MySQL5.7 框架&#xff1a;ssm、Springboot 前端&#xff1a;Vue、ElementUI 关键技术&#xff1a;springboot、SSM、vue、MYSQL、MAVEN 数据库工具&#xff1a;Navicat、SQLyog 1、B/S结构简介 B/S…

OpenHarmony实战:使用宏、std::bind 巧妙实现进出函数日志打印

背景 我们始终渴望了解模块的调用、时序逻辑&#xff0c;每个人都会轻易地想到在函数的入口打印一条进入 enter 相关的日志&#xff0c;在函数的出口打印一条离开 leave 相关的日志。不能有遗漏&#xff0c;我们会复制这条日志到所有关心的函数中&#xff0c;为了表明是哪个模…

网络以太网之(1)基础概念

网络以太网之(1)基础概念 Author: Once Day Date: 2024年4月1日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文档可参考专栏&#xff1a;通信网络技术_Once-Day的…

EI会议相当于国内什么期刊?

EI工程索引收录类型有EI期刊和EI会议&#xff0c;EI会议水平是比较高的&#xff0c;也有的作者喜欢将EI会议与国内刊物作比较&#xff0c;那么EI会议相当于国内什么期刊&#xff1f;这主要看是什么级别的会议&#xff0c;顶尖学术会议相当于国内北大核心期刊。 EI会议应用范围和…

python怎么处理txt

导入文件处理模块 import os 检测路径是否存在&#xff0c;存在则返回True&#xff0c;不存在则返回False os.path.exists("demo.txt") 如果你要创建一个文件并要写入内容 #如果demo.txt文件存在则会覆盖&#xff0c;并且demo.txt文件里面的内容被清空&#xff0c;如…

系统架构图怎么画

画架构图是架构师的一门必修功课。 对于架构图是什么这个问题&#xff0c;我们可以按以下等式进行概括&#xff1a; 架构图 架构的表达 架构在不同抽象角度和不同抽象层次的表达&#xff0c;这是一个自然而然的过程。 不是先有图再有业务流程、系统设计和领域模型等&#…

哈希表(Hash Table) -- 用数组模拟--字符串前缀哈希

本文用于个人算法竞赛学习&#xff0c;仅供参考 目录 一.什么是哈希表 二.哈希函数中的取模映射 三.拉链法&#xff08;数组实现&#xff09; 四.拉链法模板 五.开放寻址法 六.开放寻址法模板 七.字符串前缀哈希 九.字符串前缀哈希 模板 十.题目 一.什么是哈希表 哈希表&…

fork复制进程

1.shell: 在计算机科学中&#xff0c;Shell俗称壳&#xff08;用来区别于核&#xff09;&#xff0c;是指“为使用者提供操作界面”的软件&#xff08;command interpreter&#xff0c;命令解析器&#xff09;。它类似于DOS下的COMMAND.COM和后来的cmd.exe。它接收用户命令&am…

C语言中入门到实战————动态内存管理

目录 前言 一、为什么要有动态内存分配 二、 malloc和free 2.1 malloc 2.2 free 三、calloc和realloc 3.1 calloc 3.2 realloc 四. 常见的动态内存的错误 4.1 对NULL指针的解引用操作 4.2 对动态开辟空间的越界访问 4.3 对非动态开辟内存使用free释放 4.4 使…