基于 Linux 下的生产者消费者模型

news2025/1/12 8:53:47

目录

    • 传统艺能😎
    • 概念😘
    • 特点😍
    • 优点😁
    • 基于阻塞队列的生产者消费者模型🤣
    • 模拟实现😂
      • 基于计算任务的生产者消费者模型👌

传统艺能😎

小编是双非本科大二菜鸟不赘述,欢迎米娜桑来指点江山哦
在这里插入图片描述
1319365055

🎉🎉非科班转码社区诚邀您入驻🎉🎉
小伙伴们,满怀希望,所向披靡,打码一路向北
一个人的单打独斗不如一群人的砥砺前行
这是和梦想合伙人组建的社区,诚邀各位有志之士的加入!!
社区用户好文均加精(“标兵”文章字数2000+加精,“达人”文章字数1500+加精)
直达: 社区链接点我


在这里插入图片描述

概念😘

生产者消费者模型是指通过一个容器来解决生产者和消费者的强耦合问题,两者之间不直接通讯,而是通过容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接放到容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,实际上就是用来解耦生产者和消费者的

在这里插入图片描述

特点😍

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

三种关系 \color{red} {三种关系} 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
两种角色 \color{red} {两种角色} 两种角色: 生产者和消费者。(通常由进程或线程承担)
一个交易场所 \color{red} {一个交易场所} 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护:

  1. 生产者和生产者的关系
  2. 消费者和消费者的关系
  3. 生产者和消费者的关系

生产者和消费者之间的容器可能会被多个执行流同时访问,因此需要将该临界资源用互斥锁保护起来。其中所有的生产者和消费者都会竞争式的申请锁,因此三种关系间都存在互斥关系, 那为什么生产者和消费者又存在同步关系? \color{red} {那为什么生产者和消费者又存在同步关系?} 那为什么生产者和消费者又存在同步关系?

生产者生产的数据将容器塞满后,数据生产就会生产失败,同理,容器当中的数据被消费完后,消费者就会消费失败。这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,就像管道通信一样。

互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来

优点😁

解耦
支持并发
支持分配不均

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才执行后续代码,因此函数调用本质上是一种强耦合。对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但消费者生产者可以同时进行活动,因此生产者消费者模型本质是一种松耦合

基于阻塞队列的生产者消费者模型🤣

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

在这里插入图片描述

与普通的队列的区别在于:当队列为空时,获取元素的操作将会被阻塞,直到队列中放入元素。
当队列满时,放入元素时会被阻塞,直到有元素从队列中取出。

模拟实现😂

为了方便理解,下面我们以单生产者、单消费者为例进行,其中的 BlockQueue 就是生产者消费者模型当中的交易场所,我们可以用 STL 库当中的 queue 进行实现

在这里插入图片描述

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

#define NUM 5

template<class T>
class BlockQueue
{
private:
	bool IsFull()
	{
		return _q.size() == _cap;
	}
	bool IsEmpty()
	{
		return _q.empty();
	}
public:
	BlockQueue(int cap = NUM)
		: _cap(cap)
	{
		pthread_mutex_init(&_mutex, nullptr);
		pthread_cond_init(&_full, nullptr);
		pthread_cond_init(&_empty, nullptr);
	}
	~BlockQueue()
	{
		pthread_mutex_destroy(&_mutex);
		pthread_cond_destroy(&_full);
		pthread_cond_destroy(&_empty);
	}
	//向阻塞队列插入数据(生产者调用)
	void Push(const T& data)
	{
		pthread_mutex_lock(&_mutex);
		while (IsFull()){
			//不能进行生产,直到阻塞队列可以容纳新的数据
			pthread_cond_wait(&_full, &_mutex);
		}
		_q.push(data);
		pthread_mutex_unlock(&_mutex);
		pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
	}
	//从阻塞队列获取数据(消费者调用)
	void Pop(T& data)
	{
		pthread_mutex_lock(&_mutex);
		while (IsEmpty()){
			//不能进行消费,直到阻塞队列有新的数据
			pthread_cond_wait(&_empty, &_mutex);
		}
		data = _q.front();
		_q.pop();
		pthread_mutex_unlock(&_mutex);
		pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
	}
private:
	std::queue<T> _q; //阻塞队列
	int _cap; //阻塞队列最大容器数据个数
	pthread_mutex_t _mutex;
	pthread_cond_t _full;
	pthread_cond_t _empty;
};

由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们只需要维护生产者和消费者之间的同步与互斥关系即可。将 BlockingQueue 当中存储的数据模板化,方便以后需要时进行复用。

这里设置 BlockingQueue 存储数据上限为 5,即阻塞队列中存储了五组数据时不能进行生产了,此时生产者被阻塞。阻塞队列是会被生产者和消费者同时访问的临界资源,因此需要用一把互斥锁将其保护起来

生产者线程要向阻塞队列当中 push 数据,前提是有空间,若阻塞队列已经满了,那么此时生产者要等到阻塞队列中有空间时再唤醒。同理,消费者线程要从阻塞队列当中 pop 数据,若阻塞队列为空,那么此时该消费者线程就要等到阻塞队列中有新的数据时再唤醒。

因此在这里我们需要用到两个条件变量 一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。 \color{red} {一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。} 一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,生产者线程就应该在 full 条件变量下进行等待;当阻塞队列为空的时候,消费者线程就应该在 empty 条件变量下进行等待。

不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用 pthread_cond_wait 函数时就需要传入手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。

当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在 empty 条件变量下进行等待,因此需要唤醒在 empty 条件变量下等待的消费者线程。同理当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在 full 条件变量下进行等待,因此需要唤醒在 full 条件变量下等待的生产者线程。

判断是否满足生产消费条件时不能用 if,应该用 while

pthread_cond_wait 函数让当前执行流进行等待,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。其次,在多消费者的情况下,当生产者生产了一个数据后如果使用 pthread_cond_broadcast 函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。

为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。
在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。

#include "BlockQueue.hpp"

void* Producer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//生产者不断进行生产
	while (true){
		sleep(1);
		int data = rand() % 100 + 1;
		bq->Push(data); //生产数据
		std::cout << "Producer: " << data << std::endl;
	}
}
void* Consumer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//消费者不断进行消费
	while (true){
		sleep(1);
		int data = 0;
		bq->Pop(data); //消费数据
		std::cout << "Consumer: " << data << std::endl;
	}
}
int main()
{
	srand((unsigned int)time(nullptr));
	pthread_t producer, consumer;
	BlockQueue<int>* bq = new BlockQueue<int>;
	//创建生产者线程和消费者线程
	pthread_create(&producer, nullptr, Producer, bq);
	pthread_create(&consumer, nullptr, Consumer, bq);

	//join生产者线程和消费者线程
	pthread_join(producer, nullptr);
	pthread_join(consumer, nullptr);
	delete bq
	return 0;
}

阻塞队列要让生产者线程向队列中 push 数据,让消费者线程从队列中 pop 数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。为了便于观察,我们将数据进行打印输出

由于代码中生产者是每隔一秒生产一个数据,消费者也是,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的:

在这里插入图片描述
我们也可以让生产者不停的进行生产,而消费者每隔一秒进行消费,营造供大于求的模型:

void* Producer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//生产者不断进行生产
	while (true){
		int data = rand() % 100 + 1;
		bq->Push(data); //生产数据
		std::cout << "Producer: " << data << std::endl;
	}
}
void* Consumer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//消费者不断进行消费
	while (true){
		sleep(1);
		int data = 0;
		bq->Pop(data); //消费数据
		std::cout << "Consumer: " << data << std::endl;
	}
}

此时由于生产者生产的很快,运行代码后一瞬间阻塞队列就灌满了,此时想要再生产就只能在 full 条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒,生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致了

在这里插入图片描述
我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费。营造供不应求的模型:

void* Producer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//生产者不断进行生产
	while (true){
		sleep(1);
		int data = rand() % 100 + 1;
		bq->Push(data); //生产数据
		std::cout << "Producer: " << data << std::endl;
	}
}
void* Consumer(void* arg)
{
	BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	//消费者不断进行消费
	while (true){
		int data = 0;
		bq->Pop(data); //消费数据
		std::cout << "Consumer: " << data << std::endl;
	}
}

或者也可以当数据大于队列容量的一半时,再唤醒消费者线程;当数据小于队列容器的一半时,再唤醒生产者线程:

//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
	pthread_mutex_lock(&_mutex);
	while (IsFull()){
		//不能进行生产,直到阻塞队列可以容纳新的数据
		pthread_cond_wait(&_full, &_mutex);
	}
	_q.push(data);
	if (_q.size() >= _cap / 2){
		pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
	}
	pthread_mutex_unlock(&_mutex);
}
//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{
	pthread_mutex_lock(&_mutex);
	while (IsEmpty()){
		//不能进行消费,直到阻塞队列有新的数据
		pthread_cond_wait(&_empty, &_mutex);
	}
	data = _q.front();
	_q.pop();
	if (_q.size() <= _cap / 2){
		pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
	}
	pthread_mutex_unlock(&_mutex);
}

基于计算任务的生产者消费者模型👌

实际使用生产者消费者模型时并不是简单的让生产者生产一个数字让消费者打印,打印只是为了测试代码正确性。既然我们能将 BlockingQueue 当中存储的数据进行模板化,那也可以让 BlockingQueue 当中存储其他类型的数据。

例如实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个 Task 类,这个类当中需要包含一个 Run 成员函数,该函数代表我们想让消费者如何处理数据:

#pragma once
#include <iostream>

class Task
{
public:
	Task(int x = 0, int y = 0, int op = 0)
		: _x(x), _y(y), _op(op)
	{}
	~Task()
	{}
	void Run()
	{
		int result = 0;
		switch (_op)
		{
		case '+':
			result = _x + _y;
			break;
		case '-':
			result = _x - _y;
			break;
		case '*':
			result = _x * _y;
			break;
		case '/':
			if (_y == 0){
				std::cout << "Warning: div zero!" << std::endl;
				result = -1;
			}
			else{
				result = _x / _y;
			}
			break;
		case '%':
			if (_y == 0){
				std::cout << "Warning: mod zero!" << std::endl;
				result = -1;
			}
			else{
				result = _x % _y;
			}
			break;
		default:
			std::cout << "error operation!" << std::endl;
			break;
		}
		std::cout << _x << _op << _y << "=" << result << std::endl;
	}
private:
	int _x;
	int _y;
	char _op;
};

此时生产者放入的数据就是一个 Task 对象,而消费者拿到 Task 对象后,就可以用该对象调用 Run 函数进行数据处理:

void* Producer(void* arg)
{
	BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
	const char* arr = "+-*/%";
	//生产者不断进行生产
	while (true){
		int x = rand() % 100;
		int y = rand() % 100;
		char op = arr[rand() % 5];
		Task t(x, y, op);
		bq->Push(t); //生产数据
		std::cout << "producer task done" << std::endl;
	}
}
void* Consumer(void* arg)
{
	BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
	//消费者不断进行消费
	while (true){
		sleep(1);
		Task t;
		bq->Pop(t); //消费数据
		t.Run(); //处理数据
	}
}

运行代码,当阻塞队列满后消费者被唤醒,此时消费者在执行的就是计算任务,当阻塞队列当中的数据被消费到低于一定阈值后又会唤醒生产者进行生产:

在这里插入图片描述
此后我们想让生产者消费者模型处理某一种任务时,就只需要提供对应的 Task 类,然后类提供一个处理任务的成员函数即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/548314.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能Python-python3_date

Python 3 Date介绍 Python 3是一种非常流行的编程语言&#xff0c;其中涉及到日期处理的功能非常强大。Python 3支持处理日期、时间和时间刻度&#xff0c;因此可以在各种情况下使用它来管理日期。 日期格式 Python 3支持多种日期格式&#xff0c;如下所示&#xff1a; “Y…

不怕得罪人地推荐这9本黑客书籍

[利益声明] 1、这9本都和我有些关系或缘分&#xff0c;也是我至少过了一遍的&#xff0c;虽然并没都仔细推敲&#xff0c;但是这些书&#xff0c;我还是不得不点个赞。 2、其中一本是我和 xisigr 写的:-)我并不觉得在这不能推荐&#xff0c;因为这本书毕竟卖得很好。 然后&am…

torch.nn.functional.normalize参数说明

torch.nn.functional.normalize参数说明 函数定义参数及功能官方说明三维数据实例解释参数dim0参数dim1参数dim2参数dim-1 参考博文及感谢 函数定义 torch.nn.functional.normalize(input, p2.0, dim1, eps1e-12, outNone) # type: (Tensor, float, int, float, Optional[Tens…

chatgpt赋能Python-python3_9怎么安装jieba库

Python3.9怎么安装jieba库 随着大数据时代的到来&#xff0c;中文分词是一个愈发重要的问题。而jieba是一个基于Python的中文分词工具包&#xff0c;具有高速、易用、解耦的特点&#xff0c;广受开发者的青睐。本文将介绍如何在Python3.9环境下安装jieba库。 什么是jieba库 …

微服务: Seata AT 分布式事务以及配置方式(上篇)

目录 前言简介: 1. 安装seata-at -> 1.1 先看版本, 全局搜一下 -> 1.2 版本说明 alibaba/spring-cloud-alibaba Wiki -> 1.3 选择seata-at版本 -> 1.4 下载后按照下图进行创建文件 ---> 1.4.0 先在nacos创建命名空间seata ---> 1.4.1 registry.conf…

Chrome 的骑士盾,谷歌 Security Princess 访谈

童话故事里的公主都有一种需要被保护的感觉&#xff0c;就像马里奥大叔在这么多年来都要在库巴手上拯救出碧姬公主一样。不过在谷歌的这位 Security Princess 却手执盾牌&#xff0c;守护着大家的 Chrome 浏览器免受恶意程序攻击。小编这次就乘着世界网络安全日的机会&#xff…

微信小程序-生命周期

为什么今天突然总结一下微信小程序的生命周期呢&#xff1f;因为突然发现这个知识点忘得有点干净。所以今天就看一下微信小程序的生命周期是怎么个事吧&#xff01; 目录 生命周期 生命周期的分类 生命周期函数的作用 生命周期函数的分类 生命周期是指一个对象从创建->…

Docker -- m1芯片 macOS 安装 nginx - 03

m1芯片 macOS 安装 nginx 一、安装docker提前准备二、下载nginx相关镜像三、运行相关容器四、运行并验证 一、安装docker提前准备 查看 d o c k e r \color{#FF7D00}{docker} docker版本&#xff1a;在 c o m m e n t \color{#FF7D00}{comment} comment 中输入 docker -version…

小红薯笔记/帖子采集工具

小红书【笔记/帖子】采集工具 链接&#xff1a; http://106.53.68.168:9920/xhs-keyword-spider 规则及操作 &#xff08;1&#xff09;规则&#xff1a; 按照关键词抓取规则&#xff1a;标题中或者正文内容中包含该关键词都能被抓取下来。多种搜索模式可选&#xff0c;分别…

字节跳动10年经验,10W字228道软件测试经典面试题总结(附答案)

前言 最近有很多粉丝问我&#xff0c;有什么方法能够快速提升自己&#xff0c;通过阿里、腾讯、字节跳动、京东等互联网大厂的面试&#xff0c;我觉得短时间提升自己最快的手段就是背面试题&#xff0c;最近总结了软件测试常用的面试题&#xff0c;分享给大家&#xff0c;希望…

【mpvue】小程序开发入门

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍mpvue的使用。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习知识&#xff0c;共同进步。 &#x1f95e;喜欢的朋友可以关注一下&#xff0c;下次更…

Atlassian攻略:如何将Jira和Confluence的数据平稳迁移上云

迁移到云端相当于一次专业的冒险旅⾏。过程中肯定会经历一些颠簸&#xff0c;但只要有正确的心态和充分的准备&#xff0c;您就能完美应对。最终的目的地一定会让你感觉值得。当Atlassian调查了最近迁移的客户时&#xff0c;有89%的客户表示在他们不到6个月的时间内就意识到了迁…

chatgpt赋能Python-python3_9_7怎么换行

Python3.9.7是一款强大的编程语言&#xff0c;它具有许多优点&#xff0c;例如易于学习和使用&#xff0c;适用于不同的应用程序&#xff0c;以及具有丰富的第三方库支持。但是&#xff0c;许多人可能会面临一个问题&#xff1a;如何在Python3.9.7中正确换行&#xff1f; 在本…

一篇文章告诉你如何入门黑客技术

01 准备 当你决定做要开始学习一个新的领域时&#xff0c;你需要考虑以下几个问题。 1&#xff09;要考虑清楚你为何要学这个 说白了就是你的动机是什么&#xff0c;如果你的动机是不可持续的&#xff0c;例如盗个QQ&#xff08;甚至是挖个系统0Day漏洞&#xff09;&#x…

【数据分享】2020年全国10m分辨率土地覆盖数据

土地覆盖数据是我们在各项研究中都非常常用的数据&#xff01;之前我们分享过多种精度的土地覆盖数据&#xff0c;包括&#xff1a;两种30米精度的土地覆盖数据——2000\2010\2022年的GlobeLand地表土地覆盖数据和1990-2021年的CLDC土地覆盖数据&#xff1b;此外还分享了两种10…

IDEA spring boot maven 项目搭建

1.打开idea后选择file->new->project 2.选择maven,选择jdk&#xff0c;并且下一步next。 3.选择项目存放位置以及项目名称。 至此一个maven项目就建好了。

第三十三章 使用Redux管理状态

Redux&#xff08;全称为Redux&#xff09;是一个基于状态管理的JavaScript库&#xff0c;它可以用来构建可重用的、可维护的代码。Redux主要用于处理复杂的应用程序中的状态管理&#xff0c;它能够自动地处理应用程序中的更改&#xff0c;并在需要时更新视图。 Redux使用一种被…

Oracle MRP补丁

参考文档&#xff1a; Oracle Database Oracle Database Patch Maintenance, Release 19c and Later Releases Introducing Monthly Recommended Patches (MRPs) and FAQ (Doc ID 2898740.1) - Sunsetting of 19c RURs and FAQ (Doc ID 2898381.1). Primary Note for D…

Sui基金会联合Tencent Cloud和Numen于5月24日在香港举办生态交流会

Sui生态工作坊及交流会将于5月24日在香港举行&#xff0c;本次活动旨在提升Web3产业对Sui生态的认识&#xff0c;并为生态中的开发者搭建交流的平台&#xff0c;促进团队之间的合作。在本次的交流会中&#xff0c;您还可以了解到Sui基金会对Web3生态发展愿景不遗余力的支持、Te…

深度学习课程:手写体识别示例代码和详细注释

Pytorch 的快速入门&#xff0c;参见 通过两个神经元的极简模型&#xff0c;清晰透视 Pytorch 工作原理。本文结合手写体识别项目&#xff0c;给出一个具体示例和直接关联代码的解释。 1. 代码 下面代码展示了完整的手写体识别的 Python 程序代码。代码中有少量注释。在本文后…