【关于Linux中----生产消费模型】

news2025/2/25 10:44:49

文章目录

  • 一、生产消费模型
    • 1.1概念的引入
    • 1.2 321原则
  • 二、条件变量
    • 2.1概念的引入
    • 2.2理解条件变量
    • 2.3条件变量的使用
  • 三、基于BlockingQueue的生产者消费者模型
    • 3.1BlockingQueue的介绍
    • 3.2C++ queue模拟阻塞队列的生产消费模型
    • 3.3对生产消费任务的模拟封装
  • 四、遗留问题


一、生产消费模型

1.1概念的引入

众所周知,在多线程环境里,避免不了会出现多个执行流访问同一块共享资源的情况。但是,如果一个执行流长时间地持有锁,或者它抢占锁的能力更强,就很有可能导致其他执行流出现饥饿问题。所以,我们急切地需要一种模式来尽可能地平衡多个执行流访问公共资源的公平性,于是就出现了“生产消费模型”。

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

1.2 321原则

生产消费模型中的“交易场所”(阻塞队列)就是一块共享资源。既然是共享资源,就有可能同时被多个执行流访问,所以就必须想办法将其保护起来。

首先要知道生产者和生产者之间是互斥(竞争)关系,消费者和消费者之间也是如此,生产者和消费者之间j既有互斥关系(为了保证数据安全)又有同步关系(为了保证双方效率)。

以上内容可以总结为321原则

3种关系:生产者和生产者(互斥),消费者和消费者(互斥)生产者和消费者(互斥&&同步)
2中角色:生产者线程和消费者线程
1个交易场所:一段特定结构的缓冲区

所以,生产消费模型本质上就是维护好321原则,也就保证了共享资源的安全性和双方的效率。

由此也可以总结出生产消费模型特点:

  • 解耦----生产消费双方都通过共享资源提供和使用数据
  • 支持生产消费忙闲不均----共享资源具有缓存数据的能力
  • 提高效率

虽然共享资源具有缓存数据的能力,但是当生产者没有向缓冲区中写入数据时,消费者就只能等待数据;同样的道理,当缓冲区已经存满时,生产者就只能等消费者取走数据。所以,生产消费模型的所谓“提高效率”的特点到底体现在哪里?(这个问题下文中解释)


二、条件变量

2.1概念的引入

由于需要保证生产消费模型中个共享资源的安全,所以生产消费者之间一定有互斥关系,所以二者在访问共享资源时一定要持有锁,并且判断可不可以向其中写入或读取数据,然后再解锁。
所以,当生产者将数据放入缓冲区,直至缓冲区被填满,它下一次在向其中放入数据时就会先申请锁,再判断可不可以放入数据,不能放入数据,最后解锁离开。但是由于生产者申请锁的能力较强,就会一直先于消费者去访问共享资源,这就又导致了消费者的饥饿问题。
为了解决这个问题,就必须使用条件变量(一种数据类型):

在这里插入图片描述

跟互斥锁的接口很相像,使用前要先定义一个条件变量,再初始化,使用完之后释放条件变量,当然也可以直接将其定义为全局的。

下面再回到上面说的“导致消费者饥饿”的问题,我们可以加一个条件变量,让生产者和消费者都在满足条件的时候,才可以访问公共资源,这里就要用到一个接口:
在这里插入图片描述

这个接口的作用是,在不满足条件时,将某一线程挂起。

既然有挂起,当然需要另一个接口在线程满足条件时,将其唤醒:
在这里插入图片描述

2.2理解条件变量

还是使用上文中的例子,多个线程访问资源时,可能会出现一个线程频繁访问,而其他线程从未得到过访问机会的情况。
为了解决这个问题,需要设置一个条件变量,对访问资源的线程要做出限制,只有满足条件的线程才能访问资源。

也就是说,当条件不满足时,线程必须去某些定义好的条件变量上进行等待。

其中,当线程不满足条件时,会调用==pthread_cond_wait()这个接口,将自己放在该条件变量的等待队列中排队;当满足条件时,会调用pthread_cond_signal()==这个接口,将自己唤醒去访问资源。

2.3条件变量的使用

先来模拟一个场景:
两个线程进行抢票(不包含主线程),抢票过程暂时忽略,主线程每隔两秒唤醒一个条件变量下等待的线程。使用到的锁和条件变量直接使用默认初始值,并将其定义为全局变量。
那么,当程序运行起来时,一定会看到两个线程在抢票,并且一定会有先后顺序(虽然我们现在并不清楚谁先谁后),模拟代码如下:

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>

int tickets=1000;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

void* start_routine(void* args)
{
    std::string name=static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond,&mutex);
        std::cout<<name<<"->"<<tickets<<std::endl;
        tickets--;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    //通过条件变量来控制线程执行
    pthread_t t1,t2;
    pthread_create(&t1,nullptr,start_routine,(void*)"thread 1");
    pthread_create(&t2,nullptr,start_routine,(void*)"thread 2");

    while(true)
    {
        sleep(2);
        pthread_cond_signal(&cond);//唤醒该条件变量下等待的线程
        std::cout<<"main thread weakup one thread..."<<std::endl;
    }

    pthread_join(t1,nullptr);
     pthread_join(t2,nullptr);
    return 0;
}

Makefile内容如下:

Cond:Cond.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f Cond

运行结果如下:

[sny@VM-8-12-centos threadcon]$ ./Cond
main thread weakup one thread...thread 2->
1000
main thread weakup one thread...
thread 1->999
main thread weakup one thread...
thread 2->998
main thread weakup one thread...
thread 1->997
main thread weakup one thread...
thread 2->996
^C

可以看到,两个线程抢票时,是由明显的先后顺序的,因为两个线程在条件变量下是时刻在排队的。各位读者也可以复制粘贴代码自己运行一下试试。

上述唤醒线程是一个线程,当然上图中还有可以唤醒某一条件变量下的所有线程的接口,读者们可以自己试试。


三、基于BlockingQueue的生产者消费者模型

3.1BlockingQueue的介绍

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述

3.2C++ queue模拟阻塞队列的生产消费模型

阻塞队列封装之后的代码如下:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>

static const int gmaxcap=5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap):_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        //1.判断
        if(is_full())
        {//生产条件不满足,无法生产,生产者进行等待
            pthread_cond_wait(&_pcond,&_mutex);
        }
        //2.这里一定没有满
        _q.push(in);
        //3.走到这里,阻塞队列里一定有数据了
        pthread_cond_signal(&_ccond);//唤醒消费者
        pthread_mutex_unlock(&_mutex);
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        //1.判断
        if(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        //2.这里一定不为空
        *out=_q.front();
        _q.pop();
        //3.走到这里,阻塞队列一定有位置了
        pthread_cond_signal(&_pcond);//唤醒生产者
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    std::queue<T> _q;
    int _maxcap;//队列最大容量
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;//消费者对应的条件变量
};

模拟生产消费场景额代码如下:

#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

void* consumer(void* bq_)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(bq_);
    while(true)
    {
        //消费
        int data;
        bq->pop(&data);
        std::cout<<"消费数据: "<<data<<std::endl;
    }
    return nullptr;
}

void* productor(void* bq_)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(bq_);
    while(true)
    {
        //生产
        int data=rand()%10+1;//模拟生产的数据
        bq->push(data);
        std::cout<<"生产数据: "<<data<<std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<int>* bq=new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

Makefile内容如下:

MainCp:MainCp.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f MainCp

需要注意,在代码中,我们设置生产者每生产一个数据就休眠一秒,而对消费者不做限制。所以,如果最后的执行结果是“消费者每一次消费的数据都是生产者最后一次生产的数据”的话,就证明了二者的共享资源是一个阻塞队列!

运行结果如下:

[sny@VM-8-12-centos blockqueue]$ ./MainCp
生产数据: 10
消费数据: 10
生产数据: 6
消费数据: 6
生产数据: 1
消费数据: 1
^C

可见,结果和预测相同!

下面对代码中的几个细节做出解释:
①调用pthread_cond_wait()接口时,第二个参数一定是正在使用的互斥锁

这是因为,某一个线程因为不满足条件时,要将自己挂起等待。但是不能影响其他线程申请锁,所以,这个接口会以原子性的方式将锁释放,并将该线程挂起。
②pthread_cond_wait()这个函数在被唤醒返回的时候,会自动重新获取线程所传入的锁,以便可以继续向下执行。
③判断是否满足条件时,不应该用if,而应该用while。

因为当多个线程同时被唤醒,但最终只有一个线程可以访问资源时,就会存在异常或伪唤醒的情况,所以先后才能醒来之后,必须再判断一次是否满足条件。
所以,要对上面的代码稍作改动:

void push(const T& in)
{
    pthread_mutex_lock(&_mutex);
    //1.判断
    while(is_full())
    {//生产条件不满足,无法生产,生产者进行等待
        pthread_cond_wait(&_pcond,&_mutex);
    }
    //2.这里一定没有满
    _q.push(in);
    //3.走到这里,阻塞队列里一定有数据了
    pthread_cond_signal(&_ccond);//唤醒消费者
    pthread_mutex_unlock(&_mutex);
}

判空的时候亦是如此!

④pthread_cond_signal()可以放在临界资源内部,也可以放在外部,只要能线程被唤醒即可。

3.3对生产消费任务的模拟封装

生产者和消费者的任务是不确定的,可能是各种各样的“业务”,所以,两者的任务应该是各种类型的。
接下来,就模拟一个小小的任务(包括加减乘除取模四种运算),新增一个Task.hpp文件,内容如下:

#pragma once
#include <iostream>
#include <functional>
#include <cstdio>

class Task
{
    using func_t =std::function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func)
    {}
    std::string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    func_t _callback;
    char _op;
};

将MainCp中代码稍作修改,如下:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

const std::string oper="+-*/%";

int mymath(int x,int y,char op)
{
    int result=0;
    switch(op)
    {
    case '+':
        result= x+y;
        break;
    case '-':
        result= x-y;
        break;
    case '*':
        result= x*y;
        break;
    case '/':
        if(y==0)
        {
            std::cerr<<"div zero error!"<<std::endl;
            result=-1;
        } 
        else
            result=x/y;
        break;
    case '%':
        if(y==0)
        {
            std::cerr<<"mod zero error!"<<std::endl;
            result=-1;
        } 
        else
            result=x%y;
        break;
    default:
        break;
    }
    return result;
}

void* consumer(void* bq_)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(bq_);
    while(true)
    {
        //消费
        Task t;
        bq->pop(&t);
        std::cout<<"消费任务: "<<t()<<std::endl;
    }
    return nullptr;
}

void* productor(void* bq_)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(bq_);
    while(true)
    {
        //生产
        int x=rand()%10+1;//模拟生产的数据
        int y=rand()%5;
        int operCode=rand()%oper.size();
        Task t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"生产任务: "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<Task>* bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

最终执行结果如下:

[sny@VM-8-12-centos blockqueue]$ ./MainCp
生产任务: 6 + 1 = ?
消费任务: 6 + 1 = 7
生产任务: 9 % 0 = ?
mod zero error!
消费任务: 9 % 0 = -1
生产任务: 1 % 1 = ?
消费任务: 1 % 1 = 0
^C

四、遗留问题

下面来回答一下上文中没有回答的问题
创建多个生产者和消费者的意义是什么?生产消费模型的高效体现在哪里?

首先,我们需要知道,对于生产者而言,它的任务可能来自于各个地方,包括数据库、网络等等,它获取任务和构建任务都是需要花费时间的

其次,对于消费者而言,它从任务队列中取出任务,后续还要执行任务,也是需要花费时间的。

而生产消费模型可以让消费者在执行任务的时候,生产者也在生产任务。也可以保证一个消费者或生产者在执行或生成任务时,其他的生产者或消费者也可以进行同样的操作。可以实现在生产之前,消费之后,让线程并发执行

以上就是生产消费模型的左右内容!


本篇完,青山不改,绿水长流!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/427485.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于YOLOv5的水下海洋目标检测

摘要&#xff1a;水下海洋目标检测技术具有广泛的应用前景&#xff0c;可以用于海洋环境监测、海洋资源开发、海洋生物学研究等领域。本文提出了一种基于 YOLOv5 的水下海洋目标检测方法&#xff0c;使用数据增强方法进行了大量实验&#xff0c;并与其他方法进行了对比&#xf…

(SQL学习随笔3)SQL语法——SELECT语句

导航基本认识FROM关键字LIMIT与OFFSETORDER BYWHERE条件查询单值比较多条件组合范围筛选空值匹配LIKE通配条件分组运算符和函数数据变换分组运算表连接内连接左(右)外连接全外连接外键约束窗口函数UNION&#xff1a;表上下拼接子查询条件判断PostgreSQLMySQL基本认识 SELECT t…

【论文阅读】BiSeNet V2用于实时语义分割的双边网络

前言BiSeNet V2延续了v1版本的双边结构&#xff0c;分别处理空间细节信息、高层语义信息。同时设计更简洁高效的结构&#xff0c;进行特征提取&#xff0c;实现高精度和高速度。在训练模型时&#xff0c;使用了增强训练策略 &#xff0c;添加多个辅助训练分支来促进不同浅层网络…

Spring-aop面向切面

1、理解必要的专业术语 先看看上面图&#xff0c;这是我的个人理解。(画的丑&#xff0c;主打真实) 1&#xff09;Advice&#xff0c;通知/增强&#xff1a;类方法中提出来的共性功能(大白话就是提出来的重复代码) 2&#xff09;Pointcut&#xff0c;切入点/切点&#…

【微服务】微服务架构下你不得不知的3种部署策略

文章目录前言滚动部署蓝绿部署金丝雀部署总结前言 不知道大家有了解过你们公司的软件产品是如何部署的么&#xff1f;采用的什么部署策略&#xff1f;其实在软件开发生命周期中&#xff0c;部署是非常关键的一环&#xff0c;你需要考虑多方面的因素&#xff0c;如何保证你部署…

【推荐算法】CTR中embedding层的学习和训练

note 连续特征处理&#xff1a;facebook DLRM模型&#xff0c;对连续值的处理方式是把所有的连续值输入到一个神经网络&#xff0c;然后通过神经网络把它压缩到一个embedding维度大小的一个向量上&#xff0c;然后将Embedding和其他离散特征Embedding Concat起来&#xff0c;再…

springboot-分页功能

1.分页功能的作用 分页功能作为各类网站和系统不可或缺的部分&#xff08;例如百度搜索结果的分页等&#xff09; &#xff0c;当一个页面数据量大的时候分页作用就体现出来的&#xff0c;其作用有以下5个。 &#xff08;1&#xff09;减少系统资源的消耗 &#xff08;2&#…

redis设计与实现读书笔记

这里主要记录一下在阅读redis设计与实现中碰到的一些没有记录过的知识。 引用计数技术 Redis的对象系统实现了基于引用计数技术的内存回收机制&#xff0c;当程序不再使用某个对象的时候&#xff0c;这个对象所占用的内存就会被自动释放;另外&#xff0c;Redis还通过引用计数…

低调且强大--iVX低代码平台

iVX目录前言一、低代码那么多 为什么选择iVX&#xff1f;二、“拼”出来的低代码平台&#xff0c;真的好用吗&#xff1f;三、iVX与其他低代码有啥可比性&#xff1f;前言 首先我们应该明白自动编程突破可能是&#xff1a;领域内Mini LLM 现在的思路都是搞LLM&#xff0c;几乎像…

通俗举例讲解动态链接、静态链接

参考动态链接 - 知乎 加上我自己的理解&#xff0c;比较好懂&#xff0c;但可能在细节方面有偏差,但总体是一致的 静态链接的背景 静态链接使得不同的程序开发者和部门能够相对独立的开发和测试自己的程序模块&#xff0c;从某种意义上来讲大大促进了程序开发的效率&#xf…

NPC 也有了生命?当 ChatGPT 注入游戏你能想象吗

&#x1f34e;道阻且长&#xff0c;行则将至。&#x1f353; 目录引言&#xff1a;西部世界元宇宙&#xff0c;还记得吗ChatGPT 的世界&#xff1f;下图就是一个 ChatGPT 小镇&#xff1a; 引言&#xff1a;西部世界 《西部世界》以一个虚构的游戏般的“西部世界”为背景&am…

springboot验证码生成及验证功能

1.easy-captcha工具包 生成验证码的方式有许多种&#xff0c;这里选择的是easy-captcha工具包。 github开原地址为&#xff1a;easy-captcha工具包 其支持Java图形验证码&#xff0c;支持gif、中文、算术等类型&#xff0c;可用于Java Web、JavaSE等项目。 2添加依赖 首先需…

SQL Server的死锁说明

死锁指南一、了解死锁二、检测并结束死锁2.1、可能死锁的资源三、处理死锁四、最大限度地减少死锁4.1、以相同的顺序访问对象4.2、避免事务中的用户交互4.3、保持交易简短且在一个批次中4.4、使用较低的隔离级别4.5、使用基于行版本控制的隔离级别4.6、使用绑定连接4.7、停止事…

【云原生|Docker】04-docker的资源限制

目录 前言 容器的生命周期 1. 容器的启动过程 2. 容器的生命周期 ​编辑 内存限制 1. 内存限制的相关参数 2. 内存限制方式 2.1 设置-m,--memory&#xff0c;不设置--memory-swap 2.2 设置-m,--memorya&#xff0c;--memory-swapb&#xff0c;且b >a 2.…

本地从0搭建Stable Diffusion WebUI及错误记录

从0开始搭建本地Stable Diffusion WebUI环境 一.环境配置 1.使用的电脑配置 系统Windows10处理器英特尔 i7内存24GB显卡NVIDIA GTX 1060(6GB) 2.镜像源 阿里云 清华大学 中国科技大学 3.电脑环境变量配置 我的电脑–属性–高级系统设置–系统属性(高级)–环境变量 新建…

spring框架注解

3.Spring有哪些常用注解呢&#xff1f; Spring常用注解 Web: Controller&#xff1a;组合注解&#xff08;组合了Component注解&#xff09;&#xff0c;应用在MVC层&#xff08;控制层&#xff09;。 RestController&#xff1a;该注解为一个组合注解&#xff0c;相当于Con…

首个ChatGPT开发的应用上线;ChatMind思维导图工具;中文提示词大全;Copilot平替 | ShowMeAI日报

&#x1f440;日报&周刊合集 | &#x1f3a1;生产力工具与行业应用大全 | &#x1f9e1; 点赞关注评论拜托啦&#xff01; &#x1f916; 『一本与众不同的AI绘本』ChatGPT 编写故事 Midjourney 绘制插图 作者的女儿特别喜欢迪士尼动画《海洋奇缘》里的主人公莫阿娜&#…

Mybatis分解式查询

目录 一、Mybatis一对多分解式查询 1. 新增持久层接口方法 2. 新增映射文件对应的标签 3. 新增测试方法 4. 运行效果 二、Mybatis一对一分解式查询 1. 新增持久层接口方法 2. 新增映射文件对应的标签 3. 新增测试方法 4. 运行效果 三、Mybatis延迟加载 1. 开启延迟加…

超实用的十个超级实用事半功倍的Python自动化脚本

一淘模板 56admin.com在日常的工作学习当中&#xff0c;我们总会遇到各式各样的问题&#xff0c;其中不少的问题都是一遍又一遍简单重复的操作&#xff0c;不妨直接用Python脚本来自动化处理&#xff0c;今天小编就给大家分享十个Python高级脚本&#xff0c;帮助我们减少无谓的…

【数据结构与算法】栈的实现(附源码)

目录 一.栈的概念和结构 二.接口实现 A.初始化 Stackinit 销毁 Stackdestroy 1.Stackinit 2.Stackdestroy B.插入 Stackpush 删除 Stackpop 1.Stackpush 2.Stackpop C.出栈 Stacktop D. 栈的有效元素 Stacksize 判空 Stackempty 1.Stacksize 2.Stackempty …