【Linux】生产者消费者模型 -- RingQueue

news2024/11/27 1:21:08

文章目录

  • 1. 生产者消费者模型的理解
    • 1.1 生产者消费者模型的概念
    • 1.2 生产者消费者模型的特点
    • 1.3 生产者消费者模型的优点
  • 2. 基于BlockQueue的生产者消费者模型

1. 生产者消费者模型的理解

1.1 生产者消费者模型的概念

生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。
在这里插入图片描述

1.2 生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下;

  • 三种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥关系,同步关系)
  • 两种角色:生产者和消费者。
  • 一个交易场所:通常指的是内存中的一段缓冲区。

我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。

生产者和生产者,消费者和消费者,生产者和消费者,它们之间为什么存在互斥关系?

因为生产者和消费者之间的容器可能被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。

其中,所有的生产者和消费者都会竞争式地申请锁,因此生产者和生产者,消费者和生产者,生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

  • 如果让生产者一直生产,那么当生产者产生的数据将容器塞满之后,生产者再生产数据就会生产失败。
  • 同理,如果让消费者一直消费,那么让容器中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,再让消费者进行消费。

1.3 生产者消费者模型的优点

  • 让生产者和消费者解耦
  • 支持并发
  • 支持忙闲不均

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完之后才继续执行主函数的后序代码,因此函数调用本质是一种紧耦合。

对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。

2. 基于BlockQueue的生产者消费者模型

在多线程编程中,阻塞队列(Block Queue)是一种常用于生产者和消费者模型的数据结构。

在这里插入图片描述

其与普通队列的区别在于:

  • 当队列为空时,从队列中获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

模拟实现基于阻塞队列的生产者消费者模型

阻塞队列实现的生产者消费者模型的基本代码如下(单生产者单消费者):

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

static const int gmaxcap = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap = gmaxcap)
        : _maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }

    // 输入型参数,我们一般设置为const &
    // 输出型参数,我们一般设置为 *
    // 输入输出型,我们一般设置为 &
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
        {
            // 生产条件不满足,无法生产,此时我们的生产者进行等待
            pthread_cond_wait(&_pcond, &_mutex);

        }
        _q.push(in);
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while (is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();

        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_ccond);
        pthread_cond_destroy(&_pcond);
    }

private:
    bool is_empty()
    {
        return _q.empty();
    }

    bool is_full()
    {
        return _q.size() == _maxcap;
    }

private:
    std::queue<T> _q;
    int _maxcap; // 队列中元素的上限
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond; // 生产者对应的条件变量
    pthread_cond_t _ccond; // 消费者对应的条件变量
};

对于代码,有以下解释:

  • 由于我们实现的是单生产者,单消费者的生消模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的关系即可。
  • 这里设置BlockQueue存储数据的上线为5,当存储5组数据之后,生产者就不能再进程生产了,直到消费者消费一组数据之后,才可以继续生产。
  • 阻塞队列是生产者和消费者都能访问的临界资源,因此我们需要用互斥锁将其保护起来。
  • 生产者向队列中Push数据时,前提是队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中欧空间时再次将其唤醒。
  • 消费者线程要从阻塞队列中Pop数据时,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要先进行等待,直到阻塞队列中有新的数据时再将其唤醒。
  • 因此我们在这里用到两个条件变量,一个条件变量用来描述队列为空,一个条件变量用来描述队列为满。
  • 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区之后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程的互斥锁,此时当该线程被挂起时就会自动释放互斥锁,而当线程被唤醒时就又会自动获取互斥锁。
  • 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程。

判断是否满足生产消费条件时不能用if,而应该用while:

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会进行往后执行。
  • 其次,在多消费者情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

主函数中的代码如下:

#include "BlockQueue.hpp"
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <cstdlib>

void* consumer(void* bq_)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(bq_);
    while (true)
    {
        // 消费活动
        int data;
        bq->pop(&data);
        std::cout << "消费数据:" << data << std::endl;
    }
    return nullptr;
}

void* productor(void* bq_)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(bq_);

    while (true)
    {
        // 生产活动
        int data = rand() % 10 + 1;
        bq->push(data);
        std::cout << "生产数据:" << data << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c, p;
    // 传入bq,让两个线程看到同一份阻塞队列
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

	delete bq;
    return 0;
}

当生产者生产很快,消费者消费很慢时,在生产者生生产的数据满了之后就会阻塞下来等待消费者,最终它们的步调相同。若生产者生产慢,消费者消费快也同理。

基于计算任务的生产者消费者模型

实际使用生产者消费者模型可不是简单地让生产者生产一个数据让消费者打印,我们这样做只是为了测试代码的正确性。

由于我们将BlockQueue中存储的数据进行了模板化,此时就可以让BlockQueue当中存储其他类型的数据。

例如,我们可以实现一个基于计算任务的生产消费模型,此时我们只需定义一个Task类,这个类当中需要包含一个Run成员,该函数代表着我们想让消费者如何处理拿到的数据。

#pragma once
#include <iostream>

class Task
{
public:
	Task(int x = 0, int y = 0, int op = 0)
		: _x(x), _y(y), _op(op)
	{}
	~Task()
	{}
	void Run()
	{
		int result = 0;
		switch (_op)
		{
		case '+':
			result = _x + _y;
			break;
		case '-':
			result = _x - _y;
			break;
		case '*':
			result = _x * _y;
			break;
		case '/':
			if (_y == 0){
				std::cout << "Warning: div zero!" << std::endl;
				result = -1;
			}
			else{
				result = _x / _y;
			}
			break;
		case '%':
			if (_y == 0){
				std::cout << "Warning: mod zero!" << std::endl;
				result = -1;
			}
			else{
				result = _x % _y;
			}
			break;
		default:
			std::cout << "error operation!" << std::endl;
			break;
		}
		std::cout << _x << _op << _y << "=" << result << std::endl;
	}
private:
	int _x;
	int _y;
	char _op;
};

主函数代码如下:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <cstdlib>

void* consumer(void* bq_)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_);
    while (true)
    {
        // 消费活动
        Task t;
        bq->pop(&t);
        t.Run();
    }
    return nullptr;
}

void* productor(void* bq_)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_);
    const char* arr = "+-*/%";

    while (true)
    {
        // 生产活动
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        char op = arr[rand() % 5];
        Task t(x, y, op);
        bq->push(t);
        std::cout << "producer task done" << std::endl;
        sleep(1);
    }
    return nullptr;
}

运行代码,此时消费者执行的就是计算任务了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/764438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从iPhone恢复已删除音视频的5种主要方法

“我需要从iPhone恢复已删除的音视频。我真的很喜欢我的音视频文件。我玩了很多封面&#xff0c;并检查听我可以改进的地方和不可以改进的地方。 iPhone是我完成这项任务的首选手机&#xff0c;因为我喜欢保持非常简单&#xff0c;我喜欢听我的iPhone。但是&#xff0c;我确实删…

4.Cesium中实体Entity的增删改查及性能优化(超详细)

前言 Cesium 作为一个功能强大的 WebGL 三维地球仪库,内置了丰富的三维地图展示能力。在 Cesium 中,我们可以通过 Entity(实体)在三维场景中添加和控制各种三维对象,如点、线、面、模型等。本文将介绍 Cesium 中实体的增删改查操作。 概述 添加到场景中的实体都保存在 viewer.…

【AI绘画】Stable-Diffusion-Webui本地部署-简单绘画图片

这里写目录标题 前言一、Stable Diffusion是什么&#xff1f;二、安装stable-diffusion-webui1. python安装2. 下载模型3. 开始安装&#xff1a;4. 汉化&#xff1a;5. 模型使用&#xff1a;6. 下载新模型&#xff1a;7. 基础玩法 三、总结 前言 本文将借助stable-diffusion-w…

【idea】的一些使用指南

一、serializable自动生成id 1.打开File菜单&#xff0c;选择Settings选项 2.打开Editor->Inspections 3.在右边的搜索框中输入serialVersionUID关键字&#xff0c;出现以下选项&#xff0c;勾选"Serializable class without serialVersionUID"&#xff0c;然后别…

攻不下dfs不参加比赛(十)

标题 为什么练dfs题目总结为什么练dfs 相信学过数据结构的朋友都知道dfs(深度优先搜索)是里面相当重要的一种搜索算法,可能直接说大家感受不到有条件的大家可以去看看一些算法比赛。这些比赛中每一届或多或少都会牵扯到dfs,可能提到dfs大家都知道但是我们为了避免眼高手低有…

非监督学习-K均值聚类-知识点扫盲

前言 在实际工作中&#xff0c;我们经常会遇到这样一类问题&#xff1a;给机器输入大量的特征数据&#xff0c;并期望机器通过学习找到数据中存在的某种共性特征或者结构&#xff0c;亦或是数据之间存在的某种关联。 例如&#xff0c;视频网站根据用户的观看行为对用户进行分组…

【MongoDB】SpringBoot整合MongoDB

【MongoDB】SpringBoot整合MongoDB 文章目录 【MongoDB】SpringBoot整合MongoDB0. 准备工作1. 集合操作1.1 创建集合1.2 删除集合 2. 相关注解3. 文档操作3.1 添加文档3.2 批量添加文档3.3 查询文档3.3.1 查询所有文档3.3.2 根据id查询3.3.3 等值查询3.3.4 范围查询3.3.5 and查…

8、gateway使用和原理

一、什么是Spring Cloud Gateway 1、网关简介 网关作为流量的入口&#xff0c;常用的功能包括路由转发&#xff0c;权限校验&#xff0c;限流等。 2、Gateway简介 Spring Cloud Gateway 是Spring Cloud官方推出的第二代网关框架&#xff0c;定位于取代 Netflix Zuul。相比 …

【iOS】编译与链接过程

前言 计算机语言分为&#xff1a;机器语言、汇编语言和高级语言。 高级语言又能分为&#xff1a;编辑语言、解释语言。 解释语言 解释语言编写的程序在每次运行时都需要通过解释器对程序进行动态解释和执行&#xff0c;即解释一条代码&#xff0c;执行一条代码。 优点&…

ADC 的初识

ADC介绍 Q: ADC是什么&#xff1f; A: 全称&#xff1a;Analog-to-Digital Converter&#xff0c;指模拟/数字转换器 ADC的性能指标 量程&#xff1a;能测量的电压范围分辨率&#xff1a;ADC能辨别的最小模拟量&#xff0c;通常以输出二进制数的位数表示&#xff0c;比如&am…

一百三十、海豚调度器——用DolphinScheduler定时调度HiveSQL任务

一、目标 用海豚调度器对Hive数仓各层数据库的SQL任务进行定时调度。比如&#xff0c;DWD层脱敏清洗表的动态插入数据、DWS层指标表的动态插入数据 二、工具版本 1、海豚调度器&#xff1a;apache-dolphinscheduler-2.0.5-bin.tar.gz 2、Hive&#xff1a;apache-hive-3.1.2…

随手笔记——Sophus的基本使用方法

随手笔记——Sophus的基本使用方法 说明CMakeLists.txt补充&#xff1a;关于 ADD_SUBDIRECTORY 的使用使用CMakeLists执行顺序 源代码 说明 Sophus 库支持SO(3) 和SE(3)&#xff0c;此外还含有二维运动 SO(2)&#xff0c;SE(2) 以及相似变换 Sim(3) 的内容。它是直接在 Eigen …

数据结构--图的存储邻接矩阵法

数据结构–图的存储邻接矩阵法 无向图&#xff1a; 有向图&#xff1a; #define MaxVerTexNum 100 //顶点数目的最大值 typedef struct {char vex[MaxVerTexNum]; //顶点表int Edge[MaxVerTexNum][MaxVerTexNum]; //邻接矩阵&#xff0c;边表int vexnum, arcnum; //图的当前顶…

最新 robot framework安装

相信大家对robot framework并不陌生&#xff0c;它是一个基于Python语言&#xff0c;用于验收测试和验收测试驱动开发&#xff08;ATDD&#xff09;的通用测试自动化框架&#xff0c;提供了一套特定的语法&#xff0c;并且有非常丰富的测试库。 ### [Python](https://www.pytho…

gogs的自定义配置

在 GOGS 下载并安装后&#xff0c;在程序目录下建立一个custom/conf/app.ini的配置文件&#xff0c;内容如下&#xff1a; APP_NAME Gogs # APP名字 RUN_USER git # 启动用户&#xff0c;设置后只能以此账号启动gogs RUN_MODE prod[database] DB_TYPE mysql HOST 1…

联通 Flink 实时计算平台化运维实践

摘要&#xff1a;本文整理自联通数科实时计算团队负责人、Apache StreamPark Committer 穆纯进在 Flink Forward Asia 2022 平台建设专场的分享&#xff0c;本篇内容主要分为四个部分&#xff1a; 实时计算平台背景介绍 Flink 实时作业运维挑战 基于 StreamPark 一体化管理 …

智能安全配电装置在老旧建筑防火中的应用 安科瑞 许敏

【摘要】现代社会的发展离不开电能&#xff0c;随着电能应用的广泛性&#xff0c;对用电安全有了更高的要求。近些年来&#xff0c;用电安全形式严峻&#xff0c;尤其是一些老旧建筑中因用电而引起的火灾事故频发&#xff0c;造成一系列严重的损失&#xff0c;严重影响着民众的…

rancher部署

Rancher 管理 Kubernetes 集群 //Rancher 简介 Rancher 是一个开源的企业级多集群 Kubernetes 管理平台&#xff0c;实现了 Kubernetes 集群在混合云本地数据中心的集中部署与管理&#xff0c; 以确保集群的安全性&#xff0c;加速企业数字化转型。超过 40000 家企业每天使用 …

工控机设备安全

工控设备安全现状 工业控制系统是支撑国民经济的重要设施&#xff0c;是工业领域的神经中枢。现在工业控制系统已经广泛应用于电力、通信、化工、交通、航天等工业领域&#xff0c;支撑起国计民生的关键基础设施。 随着传统的工业转型&#xff0c;数字化、网络化和智能化的工…

C++11(3)——lambda表达式

目录 1 C98中的一个例子 2 lambda表达式 3 lambda表达式语法 4 函数对象与lambda表达式 1 C98中的一个例子 在C98中&#xff0c;如果想要对一个数据集合中的元素进行排序&#xff0c;可以使用std::sort方法。 如果待排序元素为自定义类型&#xff0c;需要用户定义排序时的…