基于循环队列和信号量的生产和消费者模型

news2025/1/10 16:19:19

这一节为什么要基于信号量来实现同一个模型,原因:

 void push(const T& in)
    {
        pthread_mutex_lock(&_lock);
        while(is_Full())
        {
            //这里说明阻塞队列是满的,需要让生产者等待
            pthread_cond_wait(&_pcond,&_lock);
            
        }
        //这里说明阻塞队列至少有一个空位可以插入
        _queue.push(in);
        //唤醒消费者去消费
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_lock);
        
    }

在我们访问公共资源的时候,消费者需要竞争同一把锁,然后还要继续判断是否能在临界区中生产数据。如果竞争到了锁然后判断不能生产数据,则需要继续等待。竞争锁需要消耗时间,判断等待也需要,这就导致了程序效率的低下。因此信号量的存在就解决了这一个问题:在竞争锁之前就提前预知了临界资源是否就绪。

POSIX信号量

信号量就相当于一个计数器,计数了临界资源中资源的数目,接下来我们学习使用它的接口:

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0 表示线程间共享,非零表示进程间共享
value :信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减 1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1
int sem_post(sem_t *sem);//V()

基队于环形列的生产消型费模型

引入循环队列

循环队列是一个逻辑结构,是由数组演变而成。通过对数组的下标访问进行模运算变成一个循环访问,这就是循环队列。当消费者和生产者在同一位置时,可能循环队列为空,另一种可能就是生产者消费满了,消费者来不及消费。因此其余时间生产消费者都不在同一位置生产和消费数据。

实现模型的三个必要条件

1.生产者不能套消费者一圈。

2.消费者不能超越生产者。

3.如果生产者和消费者在同一个位置,队列为空让生产者先走,队列为满让消费者先走。

引入信号量

通过信号量我们可以预知知道队列中资源的数目,如果生产者套了消费者一圈,这时生产者就申请不到信号量,只有消费者能够申请,因此只有消费者可以消费,生产者必须等待。同理消费者把数据消费光和生产者在同一位置时,消费者申请不到信号量只有生产者可以,因此只有生产者可以生产而消费者必须等待,通过引入信号量满足了该模型的三个必要条件。

代码实现

简易版的代码实现:

main.cc

#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"

using namespace std;

void *consumer(void *queue)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(queue);
    while (true)
    {
        int result = 0;
        rq->pop(&result);
        cout << "消费者消费了一个数据:" << result << endl;
        sleep(1);
    }
}

void *product(void *queue)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(queue);
    while (true)
    {
        //(1);
        int data = rand() % 10 + 1;
        rq->push(data);
        cout << "生产者生产了一个数据:" << data << endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, (void *)rq);
    pthread_create(&p, nullptr, product, (void *)rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete rq;
    return 0;
}

RingQueue.hpp 

#include <iostream>
#include <semaphore.h>
#include <vector>
#include <cassert>

using namespace std;

static const int gcap = 6;

template <class T>
class RingQueue
{

private:
    void P(sem_t *sem) // 申请信号量 --
    {
        int n = sem_wait(sem);
        assert(n == 0);
        (void)n;
    }

    void V(sem_t *sem) // 释放信号量 ++
    {
        int n = sem_post(sem);
        assert(n == 0);
        (void)n;
    }

public:
    RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap)
    {

        int n = sem_init(&_spaceSem, 0, cap);
        assert(n == 0);
        int m = sem_init(&_dataSem, 0, 0);
        assert(m == 0);
        _productStep = _consumerStep = 0;
    }

    void push(const T &in)
    {
        P(&_spaceSem); // 申请空间信号量
        _queue[_productStep++] = in;
        _productStep %= _cap;

        V(&_dataSem); // 释放数据信号量
    }

    void pop(T *out)
    {
        P(&_dataSem);
        *out = _queue[_consumerStep++];
        _consumerStep %= _cap;

        V(&_spaceSem);
    }

    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
    }

private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem;
    sem_t _dataSem;
    int _productStep;
    int _consumerStep;
};

引入计算任务:

Task.hpp

#include <iostream>
#include <functional>
#include <string>

class CalTask
{
public:
    using func_t = std::function<int(int, int, char)>;

    CalTask() {}

    CalTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d =%d", _x, _op, _y, result);
        return buffer;
    }

    std::string to_string()
    {
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

    ~CalTask()
    {
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;

    case '/':
        if (y == 0)
        {
            std::cerr << "div zero error" << std::endl;
            break;
        }
        result = x / y;
        break;

    case '%':
        if (y == 0)
        {
            std::cerr << "mod zero error" << std::endl;
            break;
        }
        result = x % y;
        break;
    }
    return result;
}

RingQueue.hpp

#include <iostream>
#include <semaphore.h>
#include <vector>
#include <string>
#include <cassert>

using namespace std;

static const int gcap = 6;
static const string symbol ="+-*/%";

template <class T>
class RingQueue
{

private:
    void P(sem_t *sem) // 申请信号量 --
    {
        int n = sem_wait(sem);
        assert(n == 0);
        (void)n;
    }

    void V(sem_t *sem) // 释放信号量 ++
    {
        int n = sem_post(sem);
        assert(n == 0);
        (void)n;
    }

public:
    RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap)
    {

        int n = sem_init(&_spaceSem, 0, cap);
        assert(n == 0);
        int m = sem_init(&_dataSem, 0, 0);
        assert(m == 0);
        _productStep = _consumerStep = 0;
    }

    void push(const T &in)
    {
        P(&_spaceSem); // 申请空间信号量
        _queue[_productStep++] = in;
        _productStep %= _cap;

        V(&_dataSem); // 释放数据信号量
    }

    void pop(T *out)
    {
        P(&_dataSem);
        *out = _queue[_consumerStep++];
        _consumerStep %= _cap;

        V(&_spaceSem);
    }

    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
    }

private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem;
    sem_t _dataSem;
    int _productStep;
    int _consumerStep;
};

main.cc

#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;

void *consumer(void *queue)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);
    while (true)
    {
        CalTask result ;
        rq->pop(&result);
        cout << "消费者处理了一个任务:" << result() << endl;
        sleep(1);
    }
}

void *product(void *queue)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);
    while (true)
    {
        int x =rand()%10;
        int y =rand()%10;
        char op =symbol[rand()%symbol.size()];
        CalTask t(x,y,op, mymath);
        rq->push(t);
        cout << "生产者生产了一个任务:" << t.to_string() << endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);
    RingQueue<CalTask> *rq = new RingQueue<CalTask>();
    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, (void *)rq);
    pthread_create(&p, nullptr, product, (void *)rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete rq;
    return 0;
}

多线程多并发执行:

想要实现多线程并发的进行消费和生产,我们只需要保证临界资源的访问安全,因此我们可以在访问临界资源的时候加上锁:

但是这么加锁有一个问题:当我们申请锁成功后才能申请信号量,锁只有一把需要多个线程竞争后才能交付并且申请信号量也需要花费时间。因此我们可以先申请信号量,让多个线程先预定好共享资源中的资源,然后再申请锁进行临界资源的访问,这样做提高了程序的效率。

最终代码:

//RingQueue.hpp

#include <iostream>
#include <semaphore.h>
#include <vector>
#include <string>
#include <cassert>

using namespace std;

static const int gcap = 6;
static const string symbol ="+-*/%";

template <class T>
class RingQueue
{

private:
    void P(sem_t *sem) // 申请信号量 --
    {
        int n = sem_wait(sem);
        assert(n == 0);
        (void)n;
    }

    void V(sem_t *sem) // 释放信号量 ++
    {
        int n = sem_post(sem);
        assert(n == 0);
        (void)n;
    }

public:
    RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap)
    {

        int n = sem_init(&_spaceSem, 0, cap);
        assert(n == 0);
        int m = sem_init(&_dataSem, 0, 0);
        assert(m == 0);
        _productStep = _consumerStep = 0;
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);

    }

    void push(const T &in)
    {
        P(&_spaceSem); // 申请空间信号量
        pthread_mutex_lock(&_pmutex);
        _queue[_productStep++] = in;
        _productStep %= _cap;
        pthread_mutex_unlock(&_pmutex);

        V(&_dataSem); // 释放数据信号量
        

    }

    void pop(T *out)
    {
        
        P(&_dataSem);
        pthread_mutex_lock(&_cmutex);
        *out = _queue[_consumerStep++];
        _consumerStep %= _cap;
        pthread_mutex_unlock(&_cmutex);
        
        V(&_spaceSem);
    
    }

    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);

    }

private:
    std::vector<T> _queue;
    int _cap;
    sem_t _spaceSem;
    sem_t _dataSem;
    int _productStep;
    int _consumerStep;
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

//Task.hpp

#include <iostream>
#include <functional>
#include <string>

class CalTask
{
public:
    using func_t = std::function<int(int, int, char)>;

    CalTask() {}

    CalTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d =%d", _x, _op, _y, result);
        return buffer;
    }

    std::string to_string()
    {
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

    ~CalTask()
    {
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;

    case '/':
        if (y == 0)
        {
            std::cerr << "div zero error" << std::endl;
            break;
        }
        result = x / y;
        break;

    case '%':
        if (y == 0)
        {
            std::cerr << "mod zero error" << std::endl;
            break;
        }
        result = x % y;
        break;
    }
    return result;
}

//main.cc

#include <iostream>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;

string SelfName()
{
    char name[64];
    snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
    return name;
}

void *consumer(void *queue)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);
    while (true)
    {
        CalTask result ;
        rq->pop(&result);
        cout << SelfName()<<"消费者处理了一个任务:" << result() << endl;
        sleep(1);
    }
}

void *product(void *queue)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(queue);
    while (true)
    {
        int x =rand()%10;
        int y =rand()%10;
        char op =symbol[rand()%symbol.size()];
        CalTask t(x,y,op, mymath);
        rq->push(t);
        cout << SelfName()<<"生产者生产了一个任务:" << t.to_string() << endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);
    RingQueue<CalTask> *rq = new RingQueue<CalTask>();
    pthread_t c[10], p[6];
    for(int i =0;i<10;++i)pthread_create(c+i, nullptr, consumer, (void *)rq);
    for(int i =0;i<6;++i)pthread_create(p+i, nullptr, product, (void *)rq);

    for(int i =0;i<10;++i)pthread_join(c[i], nullptr);
    for(int i =0;i<6;++i)pthread_join(p[i], nullptr);
    
    delete rq;
    return 0;
}

这里所说的消费者和生产者模型的高效性和上一节基于阻塞队列的是一样的,如果不清楚的话可以点击《基于阻塞队列的生产和消费模型》进行复习!!!到这里生产者和消费者模型讲解就结束了,下一节我们学习线程池,敬请期待!!!!!记得点赞多多支持喔!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/981873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jupyter 格式化与快捷键

1、标题&#xff1a; # 一级标题 ## 二级标题 ### 三级标题 2、 加粗文本&#xff1a; **加粗文本** 3、斜体文本&#xff1a; _斜体_ 4、删除线 ~删除线~ 5、高亮文本 高亮文本 6、区块引用 > 我是引用文字 >> 我是第二层 >&g…

提高广播新闻自动语音识别模型的准确性

语音识别技术的存在让机器能够听懂人类的语言&#xff0c;让机器理解人类的语言。语音识别技术发展至今&#xff0c;已经应运而上了各种各样的语音智能助手&#xff0c;可能有一天我们身边的物体都能和我们说话&#xff0c;万物相连的时代也如期而至。 数据从何而来&#xff1…

FPN模型

【简介】 2017年&#xff0c;T.-Y.Lin等人在Faster RCNN的基础上进一步提出了特征金字塔网络FPN(Feature Pyramid Networks)技术。在FPN技术出现之前&#xff0c;大多数检测算法的检测头都位于网络的最顶层(最深层)&#xff0c;虽说最深层的特征具备更丰富的语义信息&#xff0…

Mybatis 动态SQL - 使用foreach标签查询数据、批量新增、批量修改、删除数据

前面我们介绍了使用Mybatis完成数据的增删改查&#xff0c;并且也了解了如何在Mybatis中使用JDK的日志系统打印日志&#xff1b;本篇我们介绍使用Mybatis的动态SQL完成查询数据、批量新增、批量修改、删除数据。 如果您对数据的增删改查操作和Mybatis集成JDK日志系统不太了解&…

基于FPGA的RGB图像转化为灰度图实现,通过MATLAB进行辅助验证

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 vivado2019.2 matlab2022a 3.部分核心程序 timescale 1ns / 1ps // // Company: // Engineer: // // Create Date: 202…

js中添加屏蔽F12 审查元素、屏蔽开发者工具、屏蔽右键菜单、屏蔽剪切、屏蔽选中操作

在看某个网站时&#xff0c;看到一段话想复制一下&#xff0c;结果复制不了。想打开F12看看元素进行复制&#xff0c;也不行&#xff0c;没有反应。最后通过打开开发者工具看看&#xff0c;结果一打开就跳到about:blank。 看到这操作一脸懵逼&#xff0c;小样的&#xff0c;还有…

iPhone15就要到来,iPhone14可能会铺天盖地地降价

智能手机并没有变得更便宜&#xff0c;这就是为什么如果你在购买新设备&#xff0c;值得记住去年的型号。苹果即将推出的iPhone 15系列预计将于本月发布&#xff0c;有传言称9月12日将举行苹果活动。当他们真的宣布时&#xff0c;我们很可能也会看到iPhone 14的价格在iPhone 15…

【项目经验】elementui抽屉(从下到上方向)实现向上拉伸

效果图 直接上代码 <template><div><el-button click"drawerBtn" type"primary" style"margin-left: 16px;">点我打开</el-button><el-drawer title"我是标题" :modal"false" :wrapperClosable…

centos7 下使用docker安装常见的软件:Redis

关于docker的基础知识&#xff0c;请见《别在说自己不知道docker了&#xff0c;全文通俗易懂的给你说明白docker的基础与底层原理》 在自己学习的过程中经常会需要动手安装一下常见的工具&#xff0c;本篇就手把手带你用docker安装一遍。 jdk安装 如果先要更换之前的jdk从第…

汽车级肖特基二极管DSS220-Q 200V 2A

DSS220-Q是什么二极管&#xff1f;贵司有生产吗&#xff1f; 肖特基二极管DSS220-Q符合汽车级AEC Q101标准吗&#xff1f; DSS220-Q贴片肖特基二极管参数是什么封装&#xff1f;正向电流和反向电压是多大&#xff1f; DSS220-Q肖特基二极管需要100KK&#xff0c;有现货吗&#…

LeetCode 48题: 旋转图像

题目 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。 你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]]…

LeetCode刷题笔记【24】:贪心算法专题-2(买卖股票的最佳时机II、跳跃游戏、跳跃游戏II)

文章目录 前置知识122.买卖股票的最佳时机II题目描述贪心-直观写法贪心-优化代码更简洁 55. 跳跃游戏题目描述贪心-借助ability数组贪心-只用int far记录最远距离 45.跳跃游戏II题目描述回溯算法贪心算法 总结 前置知识 参考前文 参考文章&#xff1a; LeetCode刷题笔记【23】…

Ansible-roles学习

目录 一.roles角色介绍二.示例一.安装httpd服务 一.roles角色介绍 roles能够根据层次型结构自动装载变量文件&#xff0c;tasks以及handlers登。要使用roles只需在playbook中使用include指令即可。roles就是通过分别将变量&#xff0c;文件&#xff0c;任务&#xff0c;模块以…

Python爬虫(十八)_多线程糗事百科案例

多线程糗事百科案例 案例要求参考上一个糗事百科单进程案例:https://cloud.tencent.com/developer/article/1021994 Queue(队列对象) Queue是python中的标准库&#xff0c;可以直接import Queue引用&#xff1b;队列时线程间最常用的交互数据的形式。 python下多线程的思考…

0015Java程序设计-springboot美食网站

摘 要目 录**绪论**1.1背景及意义1.2 国内外研究概况1.3 研究的内容 开发环境 摘 要 随着移动应用技术的发展&#xff0c;越来越多的用户借助于移动手机、电脑完成生活中的事务&#xff0c;许多的传统行业也更加重视与互联网的结合。 本论文主要介绍基于java的美食网站&#…

Ubuntu系统自动清理系统内存脚本和使用方法

在使用Ubuntu系统时会出现内存占用太多&#xff0c;系统卡顿现象&#xff0c;有一种shell脚本可以自动清理系统内存&#xff0c;使用方法如下&#xff1a; 1. 新建脚本文件 如 /home/hulk/tools/SysTools/memory-monitor.sh #!/bin/bash# while [[ true ]]; doCOMPILE_TIMEdat…

Yarn资源调度器

文章目录 一、Yarn资源调度器1、架构2、Yarn工作机制3、HDFS、YARN、MR关系4、作业提交之HDFS&MapReduce 二、Yarn调度器和调度算法1、先进先出调度器&#xff08;FIFO&#xff09;2、容量调度器&#xff08;Capacity Scheduler&#xff09;3、公平调度器&#xff08;Fair …

配电室智能运维方案

为提高配电房的智能运维水平&#xff0c;实现智能运维、多端监测、远程控制、用电分析和异常告警等功能&#xff0c;力安科技依托电易云-智慧电力物联网提供了配电室智能运维方案&#xff0c;协助用户监测配电房内的设备运行状态、实现故障实时报警及无人值守&#xff0c;消灭人…

机器学习笔记:node2vec(论文笔记:node2vec: Scalable Feature Learning for Networks)

2016 KDD 1 intro 利用graph上的节点相似性&#xff0c;对这些节点进行embedding 同质性&#xff1a;节点和其周围节点的embedding比较相似 蓝色节点和其周围的节点结构等价性 结构相近的点embedding相近 比如蓝色节点&#xff0c;都处于多个簇的连接处 2 随机游走 2.1 介绍…

vue+antd——table组件实现动态列+表头下拉选择功能——技能提升

Table 表格 展示行列数据。 何时使用 当有大量结构化的数据需要展现时&#xff1b; 当需要对数据进行排序、搜索、分页、自定义操作等复杂行为时。 最近在写vueantd的框架&#xff0c;遇到一个需求&#xff1a;就是要实现table表格的动态列&#xff0c;并且相应的表头要实现下拉…