Linux生产者消费者模型

news2025/1/12 9:59:18

生产者消费者模型

  • 生产者消费者模型
    • 生产者消费者模型的概念
    • 生产者消费者模型的特点
    • 生产者消费者模型优点
  • 基于BlockingQueue的生产者消费者模型
    • 基于阻塞队列的生产者消费者模型
    • 模拟实现基于阻塞队列的生产消费模型

生产者消费者模型

生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

在这里插入图片描述

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系:生产者与生产者(竞争与互斥关系)、消费者与消费者(竞争与互斥关系)、生产者与消费者(互斥与同步关系);
  • 两种角色:生产者与消费者;
  • 一个交易场所:通常指内存中的一段缓冲区。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

因为生产者与消费者之间的容器会被多个访问流进行访问,所以我们就需要将该临界资源使用互斥锁保护起来,防止线程安全问题的发生,因此所有的生产者和消费者都会竞争式的申请锁,生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

如果生产者一直生产,当空间被填满以后,生产者就会停止生产,消费者也是一样,如果消费者一直消费,空间中数据被消耗完了,消费者也会停止消费。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。

对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。

基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
在这里插入图片描述
阻塞队列的特点在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

模拟实现基于阻塞队列的生产消费模型

我们先以单生产者,单消费者为例:

其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现,下面我们进行一个简单的封装:

#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#include <mutex>

#define NUM 5

template <class T>
class BlockQueue
{
private:
    // 判断队列是否满了
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

    // 判断队列是否为空
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }

public:
    // 构造函数
    BlockQueue(int capacity = NUM) : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }

    // 向阻塞队列中插入数据(生产者)
    void push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mtx);

        while (isQueueFull())
        {
            // 如果生产者生产过程中数据满了,就阻塞等待
            pthread_cond_wait(&_full, &_mtx);
        }
        _bq.push(in);

        // 解锁
        pthread_mutex_unlock(&_mtx);
        // 唤醒消费者
        pthread_cond_signal(&_empty);
    }

    // 向阻塞队列中获取数据(消费者)
    void pop(T &out)
    {
        // 加锁
        pthread_mutex_lock(&_mtx);

        while (isQueueEmpty())
        {
            // 如果消费者消费过程中数据空了,就阻塞等待
            pthread_cond_wait(&_empty, &_mtx);
        }

        out = _bq.front();
        _bq.pop();

        // 解锁
        pthread_mutex_unlock(&_mtx);
        // 唤醒生产者
        pthread_cond_signal(&_full);
    }

    // 析构函数
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

private:
    // 阻塞队列
    std::queue<T> _bq;
    // 阻塞队列最大容器个数
    int _capacity;
    // 通过互斥锁来保证队列安全
    pthread_mutex_t _mtx;
    // 用来表示_bq是否为满的条件
    pthread_cond_t _full;
    // 用来表示_bq是否为空的条件
    pthread_cond_t _empty;
};

上述代码中需要注意以下几点:

  1. 我们实现的是单生产者与单消费者的生产者消费者模型,所以我们不需要维护生产者与生产者,消费者与消费者的关系,只需要维护生产者与消费者之间的关系;
  2. 我们将BlockQueue中的参数模板化,就不会局限于一种类型,以后就可以很好的进行复用;
  3. 我们将阻塞队列最大容器个数设置为5,表示阻塞队列中存在5个数据以后就不会在进行生生产了,此时你生产者被阻塞;
  4. 由于生产者与消费者都会访问阻塞队列,阻塞队列即为临界资源,我们需要增加互斥锁来保证线程安全的问题;
  5. 生产者向阻塞队列中插入数据时,如果阻塞队列满了,生产者就会被阻塞进行等待,直到消费者获取数据完成以后,阻塞队列中存在空余空间,唤醒生产者,进行生产;同理,消费者获取数据时,如果阻塞队列空了,消费者就会被阻塞进行等待,直到生产者生产数据完成以后,唤醒消费者,进行消费;
  6. 我们需要定义两个条件变量_full_empty描述阻塞队列的状态,进而才可以判断何时运行,何时等待;
  7. pthread_cond_wait除了会传入一个条件变量以外还会传入一个互斥锁,我们会发现,我们是在临界区中进行等待的,我们此时还处于持有锁状态,pthread_cond_wait第二个参数意义就在于成功调用wait之后,传入的锁,会被自动释放,当被唤醒的时候,就会自动获取线程锁;

判断是否满足生产消费条件时不能用if,而应该用while:

  1. pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  2. 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  3. 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据:

#include "BlockQueue.hpp"

void* consumer(void* args)
{
    BlockQueue<int>* bqueue = (BlockQueue<int>*)args;

    while(true)
    {
        int a;
        bqueue->pop(a);
        std::cout << "consumer:" << a << std::endl;
        sleep(1);
    }

    return nullptr;
}

void* productor(void* args)
{
    BlockQueue<int>* bqueue = (BlockQueue<int>*)args;
    int a = 1;
    while(true)
    {
        bqueue->push(a);
        std::cout << "productor:" << a << std::endl;
        a++;
        sleep(1);
    }

    return nullptr;
}

int main()
{
    pthread_t c, p;
    
    BlockQueue<int>* bq = new  BlockQueue<int>();
    
    //创建生产者消费者线程
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);


    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

}

当生产者与消费者步调一致时,我们会发现生产者生产一个数据,消费者就会消费一个数据:
在这里插入图片描述

当生产者生产的快,消费者消费的慢时,阻塞队列满了就会导致生产者阻塞等待,只有当消费者被唤醒以后消费掉一个数据,此时生产者才会被唤醒继续生产数据:
在这里插入图片描述
当生产者生产的慢,消费者消费的快,因为最开始阻塞队列中并没有数据,所以消费者就会阻塞等待,当生产者生产一个数据以后,消费者就会被唤醒消费一个数据,然后生产者继续被唤醒生产数据,消费者消费数据,步调保持一致:
在这里插入图片描述
当我们满足某一条件时再唤醒对应的生产者或消费者,比如当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。

// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    while (isQueueFull())
    {
        // 如果生产者生产过程中数据满了,就阻塞等待
        pthread_cond_wait(&_full, &_mtx);
    }
    _bq.push(in);

    if (_bq.size() > _capacity / 2)
        // 唤醒消费者
        pthread_cond_signal(&_empty);

    // 解锁
    pthread_mutex_unlock(&_mtx);
}

// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{
    // 加锁
    pthread_mutex_lock(&_mtx);

    while (isQueueEmpty())
    {
        // 如果消费者消费过程中数据空了,就阻塞等待
        pthread_cond_wait(&_empty, &_mtx);
    }

    out = _bq.front();
    _bq.pop();

    if (_bq.size() <= _capacity / 2)
        // 唤醒生产者
        pthread_cond_signal(&_full);

    // 解锁
    pthread_mutex_unlock(&_mtx);
}

在这里插入图片描述

我们仍然让生产者生产的快,消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。

基于计算任务的生产者消费者模型

当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。

由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。
例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个func_t成员函数:

#pragma once

#include <iostream>
#include <functional>

typedef std::function<int(int, int)> func_t;

class Task
{
public:
    Task()
    {
    }

    Task(int x, int y, func_t func) : _x(x), _y(y), _func(func)
    {
    }

    ~Task()
    {
    }

    int operator()()
    {
        return _func(_x, _y);
    }

public:
    int _x;
    int _y;
    func_t _func;
};

同时我们也可以将锁进行一个封装,采用RAII形式的加锁解锁风格,创建锁对象自动调用构造函数加锁,除了作用域自动调用析构函数解锁。

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx) : _mtx(mtx)
    {
    }

    void lock()
    {
        std::cout << "需要进行加锁" << std::endl;
        pthread_mutex_lock(_mtx);
    }

    void unlock()
    {
        std::cout << "需要进行解锁" << std::endl;
        pthread_mutex_unlock(_mtx);
    }
    ~Mutex()
    {
    }

private:
    pthread_mutex_t *_mtx;
};

class LockGuard
{
public:
    LockGuard(Mutex mtx) :_mtx(mtx)
    {
        _mtx.lock();
    }

    ~LockGuard()
    {
        _mtx.unlock();
    }
private:
    Mutex _mtx;
};

此时我们的BlockQueue.hpp中插入和获取数据代码就可以优化为:

// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{
    LockGuard lockguard(&_mtx);

    while (isQueueFull())
    {
        // 如果生产者生产过程中数据满了,就阻塞等待
        pthread_cond_wait(&_full, &_mtx);
    }
    _bq.push(in);

    // 唤醒消费者
    pthread_cond_signal(&_empty);

}

// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{
    LockGuard lockguard(&_mtx);

    while (isQueueEmpty())
    {
        // 如果消费者消费过程中数据空了,就阻塞等待
        pthread_cond_wait(&_empty, &_mtx);
    }

    out = _bq.front();
    _bq.pop();

    // 唤醒生产者
    pthread_cond_signal(&_full);
}

运行代码,当生产者向阻塞队列中写入一个数据后,随即消费者就会被唤醒,获取数据,也就是进行计算操作:
在这里插入图片描述
同样,我们也可以创建多个线程进行计算:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1112413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT AIGC自动生成多条件复杂计算函数

在Excel中经常会遇到多条件判断,根据不同的条件与内容显示不同的值。 例如: 需要给每个员工根据入职年限,员工等级,满意度等维度给员工发年终奖。 这在职场办公过程中经常要面临的一个问题。如销售额达到多少,取多少提成,如学生成绩在什么区间是设置为优秀还是良好等一…

Windows开启telnet功能

打开控制面板&#xff0c;找到「程序和功能」&#xff0c;点击「启动或关闭Windows功能」 勾选「Telnet客户端」 点击确定&#xff0c;等待Windows完成设置。 然后就可以啦~ 今日金句&#xff1a; 煞笔给我退退退&#xff01;&#xff01;&#xff01;

ELK 单机安装

一丶软件下载 elasticsearch: https://www.elastic.co/downloads/past-releases kibana: https://www.elastic.co/downloads/past-releases 选择对应的版本的下载即可 二、es 安装es比较简单 rpm -ivh elasticsearch-2.4.2.rpm 修改配置文件 /etc/elasticsearch/elas…

基于SSM+Vue的体育馆管理系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

vue3插件开发,上传npm

创建插件 在vue3工程下&#xff0c;创建组件vue页: toolset.vue。并设置组件名称。注册全局组件。新建index.js文件。内容如下&#xff0c;可在main.js中引入index.js&#xff0c;注册该组件进行测试。![在这里插入图片描述](https://img-blog.csdnimg.cn/a3409d2cbeec41c797d5…

大数据Flink(九十九):SQL 函数的解析顺序和系统内置函数

文章目录 SQL 函数的解析顺序和系统内置函数 一、​​​​​​​SQL 函数

k8s kubernetes 1.23.6 + flannel公网环境安装

准备环境&#xff0c;必须是同一个云服务厂商&#xff0c;如&#xff1a;华为&#xff0c;阿里、腾讯等&#xff0c;不要存在跨平台安装K8S&#xff0c;跨平台安装需要处理网络隧道才能实现所有节点在一个网络集群中&#xff0c;这里推荐使用同一家云服务厂商安装即可 这里使用…

微信小程序实现类似于 vue中ref管理调用子组件函数的方式

微信小程序中确实有类似于 vue 中 ref管理子组件的方式、 这里 我给子组件定义了一个 class 只要是 css选择器拿得到的 都没什么问题 但你要保证唯一性 建议前端开发还是慎重一点 就算是不能重复也尽量用class 因为id总还是有风险的 然后 我在子组件中顶一个了一个函数 start…

AQS中lock源码解析

什么是AQS&#xff1f; 就是基于双向链表CAS实现的锁的一种机制或者方法思想。就是AbstractQueuedSynchronizer&#xff0c;是Java并发包下的一个基类基于AQS实现的同步器包括&#xff1a;ReentrantLock、CountDownLatch、Samaphone、FutureTask、ReentrantWriteLock Abstrac…

基于nodejs+vue云旅青城系统

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

Qt ModelViewDelegate(模型-视图-代理) 介绍和使用

一、Model (模型) 介绍 Qt Model 是 Qt 的一个重要组件&#xff0c;用于管理和展示数据。它是 Qt 的 Model/View 架构的核心部分&#xff0c;用于将数据模型与其视图相分离&#xff0c;实现数据的高效处理和可视化呈现。 Qt Model 可以理解成一组数据结构&#xff0c;其中包含…

Docker容器技术实战1

1、docker容器 docker相当于传统的货运集装箱 虚拟机&#xff08;Virtual Machine&#xff0c;VM&#xff09;是一个完整的虚拟操作系统和硬件环境。它的工作原理是通过在一台物理主机上使用虚拟化软件来创建多个虚拟机实例&#xff0c;每个实例都可以运行独立的操作系统和应用…

01-初识HTML和CSS

1.HTML与CSS 1.1.什么是HTML&#xff1f;什么是CSS&#xff1f; HTML是HyperText Markup Language(超文本标记语言) ​ 它不是一种编程语言&#xff0c;而是一种标记语言&#xff0c;用于告诉浏览器如何构造你的页面。它可以由一系列HTML元素组合成web开发人员想要的简单或者…

扬帆起航:许战海方法论日文版正式发布

近日&#xff0c;中国头部战略咨询机构‘许战海咨询’最新研究成果《中国汽车行业新能源转型战略》行业白皮书日文版&#xff0c;即将在日本发布。同时发布的日文版核心方法论白皮书还有《主品牌进化战略》、《第二招牌增长战略》、《链主品牌&#xff1a;制造业的竞争之王》等…

Python特征分析重要性的常用方法

前言 特征重要性分析用于了解每个特征(变量或输入)对于做出预测的有用性或价值。目标是确定对模型输出影响最大的最重要的特征&#xff0c;它是机器学习中经常使用的一种方法。 为什么特征重要性分析很重要? 如果有一个包含数十个甚至数百个特征的数据集&#xff0c;每个特征…

C++中多态的原理【精华】

虚函数表 通过一道题我们先感受一下编译器针对多态的处理 #include <iostream> using namespace std;class Base { public:virtual void Func1(){cout << "Func1()" << endl;} private:int _b 1;char _c };int main() {cout << sizeof(B…

Nginx的请求处理流程

左端有WEB、EMAIL及TCP三种流量&#xff0c;而绿色方框里边使用非阻塞的事件驱动处理引擎进行接收这三种流量&#xff0c;所以需要状态机进行很好地识别处理。当状态机识别需要访问静态资源&#xff0c;那么就需要到硬盘里边获取&#xff1b;如果是反向代理的话&#xff0c;可以…

记录--怎么写一个可以鼠标控制旋转的div?

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 说在前面 鼠标控制元素旋转在现在也是一个很常见的功能&#xff0c;让我们从实现div元素的旋转控制开始来了解元素旋转的具体原理和实现方法吧。 效果展示 体验地址 code.juejin.cn/pen/7290719… 实现…

go-logger日志组件分割日志

功能 支持同时输出到 console, file, url命令行输出字体可带颜色文件输出支持根据 文件大小&#xff0c;文件行数&#xff0c;日期三种方式切分文件输出支持根据日志级别分别保存到不同的文件支持异步和同步两种方式写入支持 json 格式化输出代码设计易扩展&#xff0c;可根据…

C的魅力在于指针

原有的adrv9025 代理框架很好用,在其原有的平台上做改进