【Linux】生产者消费者模型——环形队列RingQueue(信号量)

news2024/12/23 16:29:36

文章目录

  • 铺垫
  • 信号量
    • 信号量概念
    • 信号量PV操作
    • 信号量基本接口
  • 环形队列的生产消费模型
    • 引入环形队列
    • 访问环形队列
    • 代码实现
    • 代码改造
    • 多生产者多消费者代码
  • 总结

铺垫

之前写的代码是存在不足的地方的:

image-20230423174805160

我们使用线程操作临界资源的时候要先去判断临界资源是否满足条件:并不能事前得知,只能通过先加锁判断,再检测,再操作、解锁,因为我们在操作临界资源的时候,有可能不就绪,但是我们无法提前得知,所以只能先加锁再检测,根据检测结果,决定下一步怎么走,那我们能不能通过一种办法提前得知是否满足条件呢?这样就不用加锁了,直接让线程等待或者访问:答案就是信号量

信号量

信号量概念

什么是信号量?

只要我们对资源进行整体加锁就默认了我们对这个资源整体使用,实际情况可能存在一份公共资源,但是允许同时访问不同的区域!(程序员编码保证不同的线程可以并发访问公共资源的不同区域!)

  • 信号量本质是一把计数器,衡量临界资源中资源数量多少的计数器

  • 只要拥有信号量,就在未来一定能够拥有临界资源的一部分,申请信号量的本质:对临界资源中特定小块资源的预定机制。**比如电影院买票预定座位

  • 只要申请成功,就一定有你的资源,只要申请失败,就说明条件不就绪,你只能等,就不需要判断了

线程要进行访问临界资源中的某一区域——得先申请信号量——前提是所有人必须先看到信号量——所以信号量本身必须是:公共资源。

信号量PV操作

P操作:sem–,申请操作,必须保证操作的原子性

V操作:sem++,归还资源,必须保证操作的原子性

信号量的核心操作就是PV原语

信号量基本接口

初步看一下信号量的基本使用接口:

#include <semaphore.h>
	//信号量初始化
int sem_init(sem_t *sem, int pshared, unsigned int value)
//sem:自己定义的信号量变量
//pshared:0表示线程间共享,非零表示进程间共享。
//value:信号量初始值(资源数目)。

    
    //信号量销毁
int sem_destroy(sem_t *sem)

    
    //信号量等待
int sem_wait(sem_t *sem):p操作,--

    
    //信号量发布
int sem_pos(sem_t *sem):V操作,++

环形队列的生产消费模型

引入环形队列

环形队列之前我们就了解过了,只要是环形队列,就存在判空判满的问题。实际上并不是真正的环形队列,而是通过数组模拟的,当数据加入到最后的位置时直接模等于数组的大小即可。通常情况下,判空判满的问题我们是通过空出一个位置,当两个指针指向同一个位置的时候是空,当只剩一个位置的时候就是满,但是我们这里不需要关注。

访问环形队列

生产者和消费者访问同一个位置的情况:空的时候,满的时候;其他情况下生产者与消费者访问的就是不同的区域了。

为了完成环形队列的生产消费,我们的核心工作就是

1.消费者不能超过生产者

2.生产者不能套消费者一个圈以上

3.生产者和消费者指向同一个位置时,如果此时满了就让消费者先走,如果此时为空就让生产者先走

大部分情况下生产者与消费者是并发执行的,但是当环形队列为空或为满的时候就会存在着同步与互斥问题。

如何去进行保证:信号量维护,信号量是衡量临界资源中资源数量的

资源是什么:

1.对于生产者,看中的是队列中的剩余空间,空间资源定义成一个信号量

2.对于消费者,看中的是队列中的数据资源,数据资源定义成一个信号量

比如我们一共有10个位置,消费者初始信号量是0,生产者初始信号量是10,如果生产者线程生产数据,申请信号量,进行P操作,信号量变为9,申请失败则阻塞;申请成功后消费者线程看到了多一个数据资源,消费者信号量进行V操作.所以我们并不需要进行判空判满:当生产者生产满了,信号量申请不到,进行阻塞,只能让消费者先走;当消费者消费完了,信号量申请不到,只能让生产者先走

代码实现

单生产单消费的环形队列生产者消费者模型,利用随机数生成数据资源,通过生产线程与消费线程进行数据的生成与数据的消费:

#pragma once
#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
static const int gcap = 5;
template<class T>
class RingQueue
{
private:
    void P(sem_t&sem)
    {
        int n  =sem_wait(&sem);
        assert(n==0);
        (void)n;
    }
    void V(sem_t&sem)
    {
        int n = sem_post(&sem);
        assert(n==0);
        (void)n;
    }
public:
    RingQueue(const int&cap = gcap):_queue(cap),_cap(cap)
    {
        int n = sem_init(&_spaceSem,0,_cap);
        assert( n == 0);
        n = sem_init(&_dataSem, 0, 0);
        assert(n==0);  
       _productorStep = _consumerStep = 0;
    }

    //生产者——空间
    void Push(const T&in)
    {
        P(_spaceSem);//申请到了空间信号量,意味着我们一定能进行正常的生产
        _queue[_productorStep++] = in;
        _productorStep%=_cap;
        V(_dataSem);
    }

    //消费者——数据
    void Pop(T *out)
    {
        P(_dataSem);
        *out = _queue[_consumerStep--];
        _consumerStep%=_cap;
        V(_spaceSem);
    }

    ~RingQueue()
    {
       sem_destroy(&_spaceSem);
       sem_destroy(&_dataSem);
    }
private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem;//生产者想生产,看中空间资源
    sem_t  _dataSem;//消费者想消费,看中数据资源
    int _productorStep;
    int _consumerStep;
};
#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
void*ProductorRoutine(void*rq)
{
    RingQueue<int>*ringqueue = static_cast<RingQueue<int>*>(rq);
    while(true)
    {
        int data = rand()%10+1;
        ringqueue->Push(data);
        std::cout<<"生产完成,生产的数据是:"<<data<<std::endl;
    }
}
void*ConsumerRoutine(void*rq)
{
    RingQueue<int>*ringqueue = static_cast<RingQueue<int>*>(rq);
    while(true)
    {
        int data;
        ringqueue->Pop(&data);
        std::cout<<"消费完成,消费的数据是:"<<data<<std::endl;
        sleep(1);
    }
}
int main()
{
    srand((unsigned int) time(nullptr)^getpid()^pthread_self()^0x7432);
    RingQueue<int>*rq = new RingQueue<int>();
    pthread_t p,c;
    pthread_create(&p,nullptr,ConsumerRoutine,rq);
    pthread_create(&c,nullptr,ProductorRoutine,rq);
    
    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    delete rq;
    return 0;
}

代码改造

实际上,生产线程和消费线程可不单单只能通过整型,我们还可以生产和消费任务,下面,我们只需要进行简单的改造即可完成:

Task.hpp:完成计算器的任务:计算两个数的加减乘除模

对于任务类Task:包含两个数x与y,以及计算方式op,以及计算的回调方法callback。

同时为了后面的生产线程和消费线程能够清楚看到过程,提供了两个方法一个是重载(),把计算的结果保存于字符串并放回,此方法用于消费者线程在队列中取出任务,把结果打印出来;另一个方法是toTaskString(),把计算的过程保存于字符串并返回,此方法用于生产者线程生产任务存放队列中,并且可以把过程打印出来

外部通过构造任务类对象t,传入生成的随机数x与y,以及随机生成的计算方式op,同时传入了计算的方法mymath,进行计算。

Task.hpp:

#pragma once
#include <iostream>
#include <functional>
#include <cstdio>
#include <cstring>
class Task
{
    using func_t = std::function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
    :_x(x),_y(y),_op(op),_callback(func)
    {}
    std::string operator()()
    {
        int result = _callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
void*ProductorRoutine(void*rq)
{
    RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq);
    while(true)
    {
        //构建任务
        int x = rand()%10;
        int y = rand()%5;
        char op = oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        //生产任务
        ringqueue->Push(t);
        std::cout<<"生产者派发了一个任务:"<<t.toTaskString()<<std::endl;
        //sleep(1);
    }
}
void*ConsumerRoutine(void*rq)
{
    RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq);
    while(true)
    {
        //构建任务
        Task t;
        //消费任务
        ringqueue->Pop(&t);
        std::string result = t();
        std::cout<<"消费者消费了一个任务:"<<result<<std::endl;
        sleep(1);
    }
}
int main()
{
    srand((unsigned int) time(nullptr)^getpid()^pthread_self()^0x7432);
    RingQueue<Task>*rq = new RingQueue<Task>();
    pthread_t p,c;
    pthread_create(&p,nullptr,ConsumerRoutine,rq);
    pthread_create(&c,nullptr,ProductorRoutine,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    delete rq;
    return 0;
}

多生产者多消费者代码

只要保证,最终进入临界区的是一个生产,一个消费就行,所以我们需要在环形队列提供的Push与Pop加锁,所以环形队列提供了多两个成员变量:一个是生产线程的锁,一个是消费线程的锁,也就是需要加两把锁,你拿你的,我拿我的

RingQueue.hpp

#pragma once
#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>
static const int gcap = 5;
template<class T>
class RingQueue
{
private:
    void P(sem_t&sem)
    {
        int n  =sem_wait(&sem);
        assert(n==0);
        (void)n;
    }
    void V(sem_t&sem)
    {
        int n = sem_post(&sem);
        assert(n==0);
        (void)n;
    }
public:
    RingQueue(const int&cap = gcap):_queue(cap),_cap(cap)
    {
        int n = sem_init(&_spaceSem,0,_cap);
        assert(n == 0);
        n = sem_init(&_dataSem, 0, 0);
        assert(n==0);  
       _productorStep = _consumerStep = 0;
       pthread_mutex_init(&_pmutex,nullptr);
       pthread_mutex_init(&_cmutex,nullptr);
    }
 
    void Push(const T&in)
    {
        P(_spaceSem);//申请到了空间信号量,意味着我们一定能进行正常的生产
        pthread_mutex_lock(&_pmutex);
        _queue[_productorStep++] = in;
        _productorStep%=_cap;
        pthread_mutex_unlock(&_pmutex);
        V(_dataSem);
    }

    void Pop(T *out)
    {
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);
        *out = _queue[_consumerStep++];
        _consumerStep%=_cap;
        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);
    }
    ~RingQueue()
    {
       sem_destroy(&_spaceSem);
       sem_destroy(&_dataSem);
       pthread_mutex_destroy(&_pmutex);
       pthread_mutex_destroy(&_cmutex);
    }
private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem;
    sem_t  _dataSem;
    int _productorStep;
    int _consumerStep;
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
std::string SelfName()
{
    char name[128];
    snprintf(name,sizeof(name),"thread[%0x%x]",pthread_self());
    return name;
}
void*ProductorRoutine(void*rq)
{
    RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq);
    while(true)
    {
        int x = rand()%10;
        int y = rand()%5;
        char op = oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        //生产任务
        ringqueue->Push(t);
        std::cout<<SelfName()<<",生产者派发了一个任务:"<<t.toTaskString()<<std::endl;
        //sleep(1);
    }
}
void*ConsumerRoutine(void*rq)
{
    RingQueue<Task>*ringqueue = static_cast<RingQueue<Task>*>(rq);
    while(true)
    {
        Task t;
        //消费任务
        ringqueue->Pop(&t);
        std::string result = t();
        std::cout<<SelfName()<<",消费者消费了一个任务:"<<result<<std::endl;
        sleep(1);
    }
}
int main()
{
    srand((unsigned int) time(nullptr)^getpid()^pthread_self()^0x7432);
    RingQueue<Task>*rq = new RingQueue<Task>();
    pthread_t p[4],c[8];
    for(int i = 0;i<4;i++) pthread_create(p+i,nullptr,ProductorRoutine,rq);
    for(int i = 0 ;i<8;i++) pthread_create(c+i,nullptr,ConsumerRoutine,rq);

    for(int i = 0;i<4;i++) pthread_join(p[i],nullptr);
    for(int i = 0 ;i<8;i++) pthread_join(c[i],nullptr);
    return 0;
}

总结

多生产多消费的意义:不管是环形队列还是阻塞队列,多线程的意义在于构建or获取任务是要花时间的,效率比较低,当消费的时候也是要花时间的,不单单只是拿出来就行了,所以多生产多消费的时候的意义在于生产之前,消费之后,处理任务获取任务的时候本身也是要花费时间的,可以在生产之前与消费之后让线程并行执行。

条件变量是一种同步机制,它允许线程等待某个条件的发生,通常与互斥锁一起使用。而信号量是一种计数器,它可以用于控制对共享资源的访问;如果想让每一刻只有一个线程访问共享资源,可以使用条件变量。但如果需要允许多个线程并发访问共享资源的不同区域,则可以使用信号量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/454608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QoS部署

1.总部和分部之间视频出现花屏,语音图像不同步的现象是有哪些原因导致的? 如图所示总部和分部之间的流量有FTP数据流量、视频流量和语音流量。如果在总部和分部之间的这几类流量没有做QoS部分或者优先级的区分,那么这些流量基于默认的无差别的流量策略去竞争带宽,如果FTP数…

暴躁兔分享:火爆圈子的XEN,我们如何吃到一波红利的

这周沉闷的熊市突然冲出一个XEN项目。 在这个项目上我们经历了拿到消息&#xff0c;看不懂不做&#xff0c;获取新的信息&#xff0c;发现可以搞&#xff0c;冲的这么一个过程。虽然由于信息查和认知差没有吃到最早拿到信息那波的利润&#xff0c;但是通过分析也跟着喝了一点汤…

网页源代码检查时隐藏 WebSocket 的后端地址

背景 近期在自研如何通过 OpenAI 实现 与ChatGPT 官网一样的聊天对话效果。 用到了 html5websocketpython 三项技术 , 于是用一天时间自学了一下这方面技术。 当实现了功能之后&#xff0c;就得考虑安全问题&#xff1a; 在用 html5 实现与 websocket 通讯时&#xff0c;如何保…

管理后台项目-05-SKU列表-上下架-详情抽屉效果-深度选择器

目录 1-SKU列表 2-SKU上下架 3-SKU详情 1-SKU列表 当用户点击Sku管理&#xff0c;组件挂载的时候&#xff0c;我们需要获取sku列表信息&#xff1b;但是获取列表方法在分页列表改变页码和每页显示大小的时候也需要触发&#xff0c;我们封装为一个方法。 //sku列表的接口 /adm…

Elasticsearch聚合、自动补全 | 黑马旅游

一、数据聚合 1、聚合的分类 聚合&#xff08;aggregations&#xff09;可以实现对文档数据的统计、分析、运算。 聚合常见有三类&#xff1a; 桶聚合 Bucket&#xff1a;对文档数据分组&#xff0c;并统计每组数量 TermAggregation&#xff1a;按照文档字段值分组&#xf…

中国电子学会2023年03月青少年软件编程Scratch图形化等级考试试卷二级真题(含答案)

中国电子学会2023年03月青少年软件编程Scratch图形化等级考试试卷二级 1.小猫的程序如图所示&#xff0c;积木块的颜色与球的颜色一致。点击绿旗执行程序后&#xff0c;下列说法正确的是&#xff1f;&#xff08;C&#xff09;&#xff08;2分&#xff09; A.小猫一直在左右移…

JDBC详解(二):获取数据库连接(超详解)

JDBC详解&#xff08;二&#xff09;&#xff1a;获取数据库连接 前言一、要素一&#xff1a;Driver接口实现类1、Driver接口介绍2、加载与注册JDBC驱动 二、要素二&#xff1a;URL三、要素三&#xff1a;用户名和密码四、数据库连接方式举例4.1 连接方式一4.2 连接方式二4.3 连…

15天学习MySQL计划-数据库引擎(进阶篇)第六天

15天学习MySQL计划-数据库引擎&#xff08;进阶篇&#xff09;第六天 1.数据库引擎 1.MySQL体系结构 连接层服务层引擎层存储层 2.存储引擎 存储引擎简介 ​ 1.概述 ​ 存储引擎就是存储数据&#xff0c;建立索引&#xff0c;更新/查询数据等技术的实现方式。存储引擎是基…

android ANativeWindow surface显示

前言 最近做车机camera 倒车影像问题&#xff0c;需要通过c调用camera&#xff0c;并显示在android ui界面之上。 最终效果图 代码实现 Android.bp cc_binary {name: "stest",vendor: true,srcs: ["main.cpp"],shared_libs: ["libcutils",&q…

Android入门基础教程

第1章 Android Studio运行第一个程序 1.1 Android Studio下载&#xff1a; 1.1.1 Android开发者官网&#xff1a; https://developer.android.google.cn ​ 1.1.2 下载Android Studio开发者工具&#xff1a; 进入Android开发者官网&#xff1b;找到Android Studio工具下载页…

【LeetCode刷题笔记】反转链表、移除链表元素、两两交换链表中的节点、删除链表的倒数第N个结点

&#x1f4dd;个人主页&#xff1a;爱吃炫迈 &#x1f48c;系列专栏&#xff1a;数据结构与算法 &#x1f9d1;‍&#x1f4bb;座右铭&#xff1a;道阻且长&#xff0c;行则将至&#x1f497; 文章目录 反转链表移除链表元素两两交换链表中的节点删除链表的倒数第 N 个结点&…

脉搏波信号去噪方法

一、引言 脉搏波信号是血管中血液流动产生的振动信号&#xff0c;反映了血管的弹性特性和血流动力学信息。由于其丰富的生理信息&#xff0c;脉搏波信号在诊断和监测心血管疾病、神经系统疾病等方面具有重要意义。然而&#xff0c;原始脉搏波信号往往受到生理干扰、环境噪声等…

人工智能在心电信号的心律失常应用

心律失常是一种常见的心脏疾病&#xff0c;它会导致心脏跳动不规律&#xff0c;严重的甚至会引发心脏骤停。传统的心律失常诊断方法是通过心电图对心律失常进行分析&#xff0c;但是这种方法需要医生具备专业的知识和经验&#xff0c;而且容易出现误诊。而人工智能技术可以对心…

倾斜摄影三维模型OSGB格式转换3DTILES的关键技术浅析

倾斜摄影三维模型OSGB格式转换3DTILES的关键技术浅析 将三维模型从OSGB格式转换为3DTILES格式需要掌握以下关键技术&#xff1a; 1、数据结构转换&#xff1a;OSGB格式和3DTILES格式采用了不同的数据结构&#xff0c;因此需要进行数据结构转换。OSGB格式采用了分层划分数据结构…

D3.js(3) path/折线图

一、概念 path 元素是用来绘制各种形状&#xff08;例如线条、曲线、弧形、圆弧等&#xff09;的元素。path 元素的 d 属性用来定义绘制的路径。具体来说&#xff0c;d 属性是一个字符串&#xff0c;包含一系列的命令和参数&#xff0c;用来描述路径的形状。 1.1 d属性 Mmov…

Linux子进程信号处理机制

Linux中子进程的信号处理与父进程的联系有以下三条&#xff1a; fork后子进程会继承父进程绑定的信号处理函数&#xff08;很好解释&#xff0c;子进程会拷贝父进程的代码&#xff0c;包括信号处理函数&#xff09;如果子进程调用exec族函数&#xff0c;子进程代码段被新的程序…

Qt 项目Mingw编译器转换为VS编译器时的错误及解决办法

错误 在mingw生成的项目&#xff0c;转换为VS编译器时通常会报些以下错误&#xff08;C4819警告&#xff0c;C2001错误&#xff0c;C2143错误&#xff09; 原因及解决方式 这一般是由于字符编码引起的&#xff0c;在源代码文件中包含了中文字符导致的。Qt Creator 生成的代码文…

算法的时间复杂度和空间复杂度分析

文章目录 实验目的实验内容实验过程运行结果复杂度分析 实验目的 通过本次实验&#xff0c;了解算法复杂度的分析方法&#xff0c;掌握递归算法时间复杂度的递推计算过程。 实验内容 二路归并排序的算法设计和复杂度分析。 实验过程 1.算法设计 归并排序&#xff1a;是指将…

活动回顾|多模态 AI 开发者的线下聚会@深圳站(内含福利)

回顾来了&#xff01; 4 月 22 日&#xff0c;由 Jina AI 和 OpenMMLab 联合主办的 「多模态 AI 」Office Hours 深圳站圆满结束&#xff0c;迎来了将近 60 位开发者的热情参与&#xff01;现场不仅有别开生面的「开发者集市」供大家打卡赢取好礼&#xff0c;更有四场干货满满的…

传统机器学习(六)集成算法(1)—随机森林算法及案例详解

传统机器学习(六)集成算法(1)—随机森林算法及案例详解 1、概述 集成学习&#xff08;Ensemble Learning&#xff09;就是通过某种策略将多个模型集成起来&#xff0c;通过群体决策来提高决策准确率。 集成学习首要的问题是选择什么样的学习器以及如何集成多个基学习器&…