【Linux】线程——生产者消费者模型、基于阻塞队列的生产消费者模型、基于环形队列的生产消费者模型、POSIX信号量的概念和使用

news2025/1/11 9:03:01

文章目录

  • Linux线程
    • 6. 生产消费者模型
      • 6.1 基于阻塞队列的生产消费者模型
        • 6.1.1 阻塞队列模型实现
      • 6.2 基于环形队列的生产消费者模型
        • 6.2.1 POSIX信号量的概念
        • 6.2.2 POSIX信号量的使用
        • 6.2.3 环形队列模型实现

Linux线程

6. 生产消费者模型

生产消费者模型的概念

  生产者消费者模型 是一种常见的并发编程模型。

  在这个模型中,通常存在两类角色:生产者和消费者。

  生产者负责生成数据或产品,并将其放入一个共享的缓冲区中。而消费者则从缓冲区中取出数据或产品进行消费处理。

  

为何要使用生产者消费者模型

  生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

  

  简单的说:就是为了解决生产者和消费者之间的速度不匹配问题。

  

生产者消费者模型优点

  解耦、支持并发、支持忙闲不均。

在这里插入图片描述
  

  上图中有"321"原则:

  3种关系:生产者和生产者(互斥)、消费者和消费者(互斥)、生产者和消费者(互斥、同步)。

  2种角色:生产者和消费者。

  1个交易场所:特定结构的内存空间。

  

6.1 基于阻塞队列的生产消费者模型

基于阻塞队列的生产消费者模型的概念

  基于阻塞队列的生产消费者模型 是一种用于协调生产者和消费者之间工作流程的编程模式。

  

实现的原理

  在这个模型中,生产者负责生成数据或产品,并将其放入一个阻塞队列中。阻塞队列是一种特殊的数据结构,当队列已满时,生产者尝试添加新元素的操作会被阻塞,直到队列有足够的空间。

  消费者则从这个阻塞队列中获取数据或产品进行处理。当队列为空时,消费者尝试获取元素的操作会被阻塞,直到生产者向队列中添加了新的元素。

  

在这里插入图片描述

  

6.1.1 阻塞队列模型实现

  这是我们的任务对象,一个简单的模拟加减乘除计算:

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    
    Task(int x,int y,char op)
        :_data1(x),_data2(y),_oper(op),_result(0),_exitcode(0)
    {}

    void run()
    {
        switch (_oper)
        {
        case '+':
            _result=_data1+_data2;
            break;
        case '-':
            _result=_data1-_data2;
            break;
        case '*':
            _result=_data1*_data2;
            break;
        case '/':
            {
                if(_data2==0) _exitcode=DivZero;
                else _result=_data1/_data2;
            }
            break;
        case '%':
            {
                if(_data2==0) _exitcode=ModZero;
                else _result=_data1%_data2;
            }
            break;
        default:
            _exitcode=Unknown;
            break;
        }
    }

    //Task对象重载运算符(),()直接进行run函数
    void operator()()
    {
        run();
    }

    std::string GetResult()
    {
        std::string r=std::to_string(_data1);
        r+=_oper;
        r+=std::to_string(_data2);
        r+="=";
        r+=std::to_string(_result);
        r+="[code: ";
        r+=std::to_string(_exitcode);
        r+="]";

        return r;
    }

    std::string GetTask()
    {
        std::string r=std::to_string(_data1);
        r+=_oper;
        r+=std::to_string(_data2);
        r+="=?";
        return r;
    }

    ~Task()
    {}

private: 
    int _data1;
    int _data2;
    char _oper;

    int _result;
    int _exitcode;
};

  

  这是我们实现的阻塞队列模型:

#pragma once

#include <iostream>
#include <queue>
#include <unistd.h>

template<class T>
class blockQueue
{
    static const int defaultnum=5;

public:
    //构造函数
    blockQueue(int maxp=defaultnum)
        :_maxp(maxp)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_p_cond,nullptr);
        pthread_cond_init(&_c_cond,nullptr);
    }

    T pop()
    {
        pthread_mutex_lock(&_mutex);
        if(_q.size()==0)
        {
            pthread_cond_wait(&_c_cond,&_mutex);
        }
        
        T out=_q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);

        return out;
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&_mutex);

        if(_q.size()==_maxp)
        {
            pthread_cond_wait(&_p_cond,&_mutex);
        }
        
        _q.push(in);
        pthread_cond_signal(&_c_cond);

        pthread_mutex_unlock(&_mutex);
    }

    //析构函数
    ~blockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

private:
    std::queue<T> _q; //生产消费队列
    int _maxp; //最大值
    pthread_mutex_t _mutex; //锁
    pthread_cond_t _p_cond; //生产同步队列
    pthread_cond_t _c_cond; //消费同步队列
};

  

  运行函数:

#include "blockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

//消费者模型
void *Consumer(void *args)
{
    blockQueue<Task> *bp=static_cast<blockQueue<Task>*>(args);

    while(true)
    {
        Task t=bp->pop();

        t();

       std::cout<<"thread id: "<<pthread_self()<<" 处理了一个任务:"<<t.GetTask()\
       <<" 处理后的任务结果:"<<t.GetResult()<<std::endl;
        sleep(3);
    }
}

//生产者模型
void *Productor(void *args)
{
    blockQueue<Task> *bp=static_cast<blockQueue<Task>*>(args);

    int len=opers.size();
    int data;
    while(true)
    {
        int data1=rand()%10+1;
        usleep(10);
        int data2=rand()%10;
        char op=opers[rand()%len];
        Task t(data1,data2,op);

        bp->push(t);
        sleep(1);
        std::cout<<"thread id: "<<pthread_self()<<" 生产了一个任务:"<<t.GetTask()<<std::endl;
        sleep(2);
    }
}

int main()
{
    srand(time(nullptr));

    blockQueue<Task> *bp=new blockQueue<Task>();
    pthread_t c[3],p[5];
    for(int i=0;i<5;i++)
    {
        pthread_create(p+i,nullptr,Productor,bp);
    }
    for(int i=0;i<3;i++)
    {  
        pthread_create(c+i,nullptr,Consumer,bp);
    }
    
    for(int i=0;i<5;i++)
    {
        pthread_join(p[i],nullptr);
    }
    for(int i=0;i<3;i++)
    {
        pthread_join(c[i],nullptr);
    }
    delete bp;
    return 0;
}

  

  基于阻塞队列的生产消费者的模型实现了(就是打出来乱乱的):

在这里插入图片描述

  

6.2 基于环形队列的生产消费者模型

基于环形队列的生产消费者模型的概念

  基于环形队列的生产消费者模型是一种常见的并发编程模型,其中使用环形队列作为生产者和消费者之间共享的数据结构。

  

实现的原理

  环形队列是一种特殊的数据结构,它的特点是队列的尾部与头部相连,形成一个环状。在基于环形队列的生产消费者模型中,生产者负责向队列中添加数据,消费者负责从队列中取出数据。

  为了保证生产者和消费者之间的正确协作,需要遵循以下三个原则:

  生产者不能超过消费者:也就是说,生产者生产数据的速度不能超过消费者处理数据的速度,否则队列会溢出。

  生产者不能领先消费者一圈:也就是说,生产者和消费者之间不能有太大的差距,否则会导致数据丢失或重复。

  在同一位置时,生产者和消费者必须互斥:也就是说,当生产者和消费者在同一位置时,只能有一个进行操作,否则会导致数据不一致。

  

在这里插入图片描述

  

6.2.1 POSIX信号量的概念

  POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

  

实现的原理

  信号量本质上是一个整数,其值表示资源的可用数量。 通过 sem_wait 操作(也称为 P 操作)来请求资源,如果信号量的值大于 0 ,则减 1 并继续执行;如果值为 0 ,则阻塞当前线程或进程,直到信号量的值大于 0 。通过 sem_post 操作(也称为 V 操作)来释放资源,使信号量的值增加 1 ,并唤醒一个等待该信号量的线程或进程。

  

6.2.2 POSIX信号量的使用

  初始化信号量

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

  参数:

  pshared:0表示线程间共享,非零表示进程间共享

  value:信号量初始值

  

  销毁信号量

int sem_destroy(sem_t *sem);

  

  等待信号量

int sem_wait(sem_t *sem); //P()

  功能:等待信号量,会将信号量的值减1

  

  发布信号量

int sem_post(sem_t *sem);//V()

  功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

  

6.2.3 环形队列模型实现

  任务继续使用上面的简单模拟计算。

  这是我们实现的环形队列模型:

#pragma once

//多生产多消费
#include <iostream>
#include <vector>
#include <ctime>
#include <semaphore.h>
#include <pthread.h>

const static int defaultcap=5;

template<class T>
class RingQueue
{
private:
    void P(sem_t &sem) //申请信号量
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem) //发布信号量
    {
        sem_post(&sem);
    }

    void Lock(pthread_mutex_t &mutex) //加锁
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex) //解锁
    {
        pthread_mutex_unlock(&mutex);
    }

public:
    RingQueue(int capacity=defaultcap)
        :_ringqueue(capacity),_capacity(capacity),_c_step(0),_p_step(0)
    {
        sem_init(&_c_sem,0,0);
        sem_init(&_p_sem,0,capacity);

        pthread_mutex_init(&_c_mutex,nullptr);
        pthread_mutex_init(&_p_mutex,nullptr);
    }

    void Push(const T &in) //生产
    {
        P(_p_sem);

        Lock(_p_mutex); //先申请信号量,因为信号量是原子的
        _ringqueue[_p_step]=in;
        //向后移动,维持环形特性
        _p_step++;
        _p_step%=_capacity;
        Unlock(_p_mutex);

        V(_c_sem);
    }

    void Pop(T *out) //消费
    {
        P(_c_sem);

        Lock(_c_mutex);
        *out=_ringqueue[_c_step];
        //向后移动,维持环形特性
        _c_step++;
        _c_step%=_capacity;
        Unlock(_c_mutex);

        V(_p_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&_c_sem);
        sem_destroy(&_p_sem);

        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

private:
    std::vector<T> _ringqueue; //模拟环形队列
    int _capacity; //容量大小

    int _c_step; //消费者位置
    int _p_step; //生产者位置 

    sem_t _c_sem; //消费者信号量,关注的数据资源
    sem_t _p_sem; //生产者信号量,关注的空间资源

    pthread_mutex_t _c_mutex;
    pthread_mutex_t _p_mutex;
};

  

  运行的函数:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"

using std::cout;
using std::endl;

struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};

void *Productor(void *args)
{
    //sleep(3);
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();

    while(true)
    {
        //获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        //生产数据
        rq->Push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;

        sleep(3); 
    }
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while(true)
    {
        //消费数据
        Task t;
        rq->Pop(&t);

        //处理数据
        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
        //sleep(1);
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq=new RingQueue<Task>();   

    pthread_t c[2],p[3];

    for(int i=0;i<3;i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p+i,nullptr,Productor,td);
    }
    for(int i=0;i<2;i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c+i,nullptr,Consumer,td);
    }

    for(int i=0;i<3;i++)
    {
        pthread_join(p[i],nullptr);
    }
    for(int i=0;i<2;i++)
    {
        pthread_join(c[i],nullptr);
    }
    return 0;
}

  

  基于环形队列的生产消费者的模型实现了:

在这里插入图片描述

            

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1932770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1-2、truffle与webjs亲密接触(truffle智能合约项目实战)

1-2、truffle与webjs亲密接触&#xff08;truffle智能合约项目实战&#xff09; 5&#xff0c;web3调用智能合约6&#xff0c;Ganache 5&#xff0c;web3调用智能合约 在前面已经完成简单的合约编写 使用web3调用此函数 Web端的代码使用web3进行智能合约的访问 首先在cmd以…

Burp安全扫描Web应用

一、浏览器设置代理 如下图所示&#xff0c;点击火狐浏览器的“扩展和主题”&#xff0c;搜索“代理”。 如下图所示&#xff0c;选择搜索到的第一个代理&#xff08;选择任何一个都可以&#xff09;。 如上图所示&#xff0c;第一个点击后&#xff0c;进入如下页面&#xff0…

ubuntu22.04 配置grpc(优化官方教程)

优化了官方教程&#xff0c;2024.7.17顺利打通。 一&#xff1a;添加环境变量 打开root文件夹下的 .bashrc 文件 编辑文件&#xff1a;滚动到文件的底部&#xff0c;然后添加以下行&#xff1a; export MY_INSTALL_DIR$HOME/.local mkdir -p "$MY_INSTALL_DIR" exp…

web安全之跨站脚本攻击xss

定义: 后果 比如黑客可以通过恶意代码,拿到用户的cookie就可以去登陆了 分类 存储型 攻击者把恶意脚本存储在目标网站的数据库中(没有过滤直接保存)&#xff0c;当用户访问这个页面时&#xff0c;恶意脚本会从数据库中被读取并在用户浏览器中执行。比如在那些允许用户评论的…

BiLSTM 实现股票多变量时间序列预测(PyTorch版)

前言 系列专栏:【深度学习&#xff1a;算法项目实战】✨︎ 涉及医疗健康、财经金融、商业零售、食品饮料、运动健身、交通运输、环境科学、社交媒体以及文本和图像处理等诸多领域&#xff0c;讨论了各种复杂的深度神经网络思想&#xff0c;如卷积神经网络、循环神经网络、生成对…

首个WebAgent在线评测框架和流程数据管理平台来了,GPT-4、Qwen登顶闭源和开源榜首!

在当今科技迅速发展的时代&#xff0c;大型语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;正以前所未有的速度改变着我们与数字世界的互动方式。基于LLM的智能代理&#xff08;LLM Agent&#xff09;&#xff0c;从简单的信息搜索到复杂的网页操作&…

C1W4.Assignment.Naive Machine Translation and LSH

理论课&#xff1a;C1W4.Machine Translation and Document Search 文章目录 1. The word embeddings data for English and French words1.1The dataThe subset of dataLoad two dictionaries 1.2 Generate embedding and transform matricesExercise 1: Translating English…

防溺水预警系统引领水域安全新篇章

一、系统概述 随着人们对水域活动的需求增加&#xff0c;溺水事故频发&#xff0c;给人们的生命安全带来了严重威胁。然而&#xff0c;如今&#xff0c;一项创新科技正在以强大的功能和无限的潜力引领着水域安全的新篇章。智能防溺水预警系统&#xff0c;作为一种集成了智能感知…

CentOS 7 安装MySQL 5.7.30

CentOS 7 安装MySQL卸载&#xff08;离线安装&#xff09; 安装配置MySQL之前先查询是否存在&#xff0c;如存在先卸载再安装 rpm -qa|grep -i mysql rpm -qa|grep -i mariadb rpm -e --nodeps mariadb-libs-5.5.68-1.el7.x86_64如下命令找到直接 rm -rf 删除&#xff08;删除…

定制开发AI智能名片商城微信小程序在私域流量池构建中的应用与策略

摘要 在数字经济蓬勃发展的今天&#xff0c;私域流量已成为企业竞争的新战场。定制开发AI智能名片商城微信小程序&#xff0c;作为私域流量池构建的创新工具&#xff0c;正以其独特的优势助力企业实现用户资源的深度挖掘与高效转化。本文深入探讨了定制开发AI智能名片商城微信…

修改了mybatis的xml中的sql不重启服务器如何动态加载更新

目录 一、背景 二、注意 三、代码 四、使用示例 五、其他参考博客 一、背景 开发一个报表功能&#xff0c;好几百行sql&#xff0c;每次修改完想自测下都要重启服务器&#xff0c;启动一次服务器就要3分钟&#xff0c;重启10次就要半小时&#xff0c;耗不起时间呀。于是在…

获取欧洲时报中国板块前新闻数据-scrapy

这里写目录标题 1.创建项目文件二.爬虫文件编写三.管道存储四.settings文件 1.创建项目文件 创建scrapy项目的命令&#xff1a;scrapy startproject <项目名字> 示例&#xff1a; scrapy startproject myspiderscrapy genspider <爬虫名字> <允许爬取的域名>…

tinymce富文本支持word内容同时粘贴文字图片上传 vue2

效果图 先放文件 文件自取tinymce: tinymce富文本简单配置及word内容粘贴图片上传 封装tinymce 文件自取&#xff1a;tinymce: tinymce富文本简单配置及word内容粘贴图片上传 页面引用组件 <TinymceSimplify refTinymceSimplify v-model"knowledgeBlockItem.content…

vue使用audio 音频实现播放与关闭(可用于收到消息给提示音效)

这次项目中因为对接了即时通讯 IM&#xff0c;有个需求就是收到消息需要有个提示音效&#xff0c;所以这里就想到了用HTML5 提供的Audio 标签&#xff0c;用起来也是很方便&#xff0c;首先让产品给你个提示音效&#xff0c;然后你放在项目中&#xff0c;使用Audio 标签&#x…

HardeningMeter:一款针对二进制文件和系统安全强度的开源工具

关于HardeningMeter HardeningMeter是一款针对二进制文件和系统安全强度的开源工具&#xff0c;该工具基于纯Python开发&#xff0c;经过了开发人员的精心设计&#xff0c;可以帮助广大研究人员全面评估二进制文件和系统的安全强化程度。 功能特性 其强大的功能包括全面检查各…

【BUG】已解决:WslRegisterDistribution failed with error: 0x800701bc

已解决&#xff1a;WslRegisterDistribution failed with error: 0x800701bc 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c;武…

Ubuntu22.04安装CUDA+CUDNN+Conda+PyTorch

步骤&#xff1a; 1、安装显卡驱动&#xff1b; 2、安装CUDA&#xff1b; 3、安装CUDNN&#xff1b; 4、安装Conda&#xff1b; 5、安装Pytorch。 一、系统和硬件信息 1、Ubuntu 22.04 2、显卡&#xff1a;4060Ti 二、安装显卡驱动 &#xff08;已经安装的可以跳过&a…

通过SchedulingConfigurer 接口完成动态定时任务

通过SchedulingConfigurer 接口完成动态定时任务 一.背景 在Spring中&#xff0c;除了使用Scheduled注解外&#xff0c;还可以通过实现SchedulingConfigurer接口来创建定时任务。它们之间的主要区别在于灵活性和动态性。Scheduled注解适用于固定周期的任务&#xff0c;一旦任…

【STM32 HAL库】I2S的使用

使用CubeIDE实现I2S发数据 1、配置I2S 我们的有效数据是32位的&#xff0c;使用飞利浦格式。 2、配置DMA **这里需要注意&#xff1a;**i2s的DR寄存器是16位的&#xff0c;如果需要发送32位的数据&#xff0c;是需要写两次DR寄存器的&#xff0c;所以DMA的外设数据宽度设置16…

JavaWeb服务器-Tomcat(Tomcat概述、Tomcat的下载、安装与卸载、启动与关闭、常见的问题)

Tomcat概述 Tomcat服务器软件是一个免费的开源的web应用服务器。是Apache软件基金会的一个核心项目。由Apache&#xff0c;Sun和其他一些公司及个人共同开发而成。 由于Tomcat只支持Servlet/JSP少量JavaEE规范&#xff0c;所以是一个开源免费的轻量级Web服务器。 JavaEE规范&…