linux【多线程】基于环形队列(RingQueue)的生产消费模型

news2025/1/8 13:42:58

基于环形队列RingQueue的生产消费模型

  • 一、引入
  • 二、信号量
    • 2.1 信号量概念
    • 2.2 信号量PV操作
    • 2.3 POSIX信号量接口
  • 三、基于环形队列(RingQueue)的生产消费模型
    • 3.1 设计思路
    • 3.2 结构设计图
    • 3.3 单生产单消费代码实现
  • 四、多生产多消费情形
  • 五、小结
    • 5.1 多生产多消费的意义
    • 5.2 条件变量与信号量

一、引入

void push(const T &in)
{
    pthread_mutex_lock(&mutex_);
    while (is_full()) pthread_cond_wait(&pcond_, &mutex_);
    q_.push(in);
    pthread_cond_signal(&ccond_);
    pthread_mutex_unlock(&mutex_);
}

前面说到了基于阻塞队列的生产消费模型中访问临界资源的时候要先去判断是否满足条件;因为我们在操作临界资源的时候,有可能不满足条件,但是我们无法提前知晓,所以只能先加锁再检测,根据检测结果,决定下一步怎么走。

就像上述对资源整体加锁,就默认了对资源整体使用,在其他线程角度,某个线程访问资源就是对整体访问;但是实际情况中可能存在一份公共资源,允许并发的访问不同的区域块!这就不得不提到信号量

二、信号量

2.1 信号量概念

  1. 信号量本质是一把计数器,衡量临界资源中资源数量多少的计数器

  2. 只要拥有信号量,未来一定能够拥有临界资源的一部分。申请信号量的本质:对临界资源中特定小块资源的预定机制。

  3. 信号量是一个计数器加上先申请,这就会令我们预先知道临界资源使用条件,就不需要像阻塞队列一样判断了

线程要进行访问临界资源中的某一区域,先申请信号量,前提是所有线程必须先看到信号量,所以信号量必须是公共资源

2.2 信号量PV操作

信号量:sem_t sem=5;
P操作:sem--,申请操作,必须保证操作的原子性
V操作:sem++,归还资源,必须保证操作的原子性
前面我们提到了锁是共享资源,但它的底层结构设计保证了它的原子性操作,对信号量也适用
信号量的核心操作就是PV原语

2.3 POSIX信号量接口

#include <semaphore.h>
//信号量初始化
int sem_init(sem_t *sem, int pshared, unsigned int value)
//sem:信号量
//pshared:0表示线程间共享,非零表示进程间共享。
//value:信号量初始值(资源数目)。
  
//信号量销毁
int sem_destroy(sem_t *sem)
//信号量等待
int sem_wait(sem_t *sem):p操作,-- 
//信号量发布
int sem_pos(sem_t *sem):V操作,++

三、基于环形队列(RingQueue)的生产消费模型

关于环形队列可参考我的博客环形队列这里不做过多叙述

3.1 设计思路

这里的判满不需要关心front和rear之间留一个位置
生产者和消费者访问同一个位置的情况:空的时候,满的时候;
其他情况下生产者与消费者访问的就是不同的区域了。

在这里插入图片描述
核心的三种条件

  1. 消费者不能超过生产者(没生产怎么消费?)
  2. 生产者不能超过消费者一个圈以上(否则数据会覆盖)
  3. 产者和消费者指向同一个位置时,如果此时满了就让消费者先走,如果此时为空就让生产者先走
  • 对于生产者,需要知道队列中的剩余空间,空间资源定义成一个信号量
  • 对于消费者,需要知道队列中的数据资源,数据资源定义成一个信号量
    先来预热一下伪代码
    在这里插入图片描述
    虽然不知道生产者消费者那个线程先运行,但是先申请成功的只能是生产者!这里需要注意的是,生产者生产之后任务已经在队列中了,需要对消费者的信号量进行++,反之亦然!这样就满足了上述的三种条件

3.2 结构设计图

在这里插入图片描述

3.3 单生产单消费代码实现

//RingQueue.hpp
#pragma once

#include <iostream>
#include <cassert>
#include <vector>
#include <semaphore.h>

static const int gcap=5;

template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        int n=sem_wait(&sem);
        assert(n==0);
        (void)n;
    }   
    void V(sem_t& sem)
    {
        int n=sem_post(&sem);
        assert(n==0);
        (void)n;
    } 
public:
    RingQueue(const int& cap=gcap)
    :queue_(cap),cap_(cap)
    {
        //初始化信号量
        int n=sem_init(&spacesem_,0,cap_);
        assert(n==0);
        n=sem_init(&datasem_,0,0);
        assert(n==0);

        pStep_=cStep_=0;
    }
    //生产者
    void push(const T& in)
    {
        P(spacesem_);
        queue_[pStep_++]=in;
        pStep_%=cap_;//避免越界
        V(datasem_);
    }

    //消费者
    void pop(T* out)
    {
        P(datasem_);
        *out=queue_[cStep_++];
        cStep_%=cap_;

        V(spacesem_);
       
    }

    ~RingQueue()
    {
        sem_destroy(&spacesem_);
        sem_destroy(&datasem_);
    }
private:
    std::vector<T> queue_;
    int cap_;
    sem_t datasem_;//消费者--数据资源
    sem_t spacesem_;//生产者--空间资源
    int pStep_;//生产者数组下标
    int cStep_;//消费者数组下标

};
//Task.hpp
#pragma once

#include <iostream>
#include <cstdio>
#include <cstring>
#include <functional>
class Task//计算任务
{

public:
    using func_t =std::function<int(int,int,char)>;
    Task(){}
    Task(int x,int y,char op,func_t callback)
        :x_(x),y_(y),op_(op),callback_(callback)
    {

    }
    std::string operator()()
    {
        int result=callback_(x_,y_,op_);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
        return buffer;
    }
private:
    int x_;
    int y_;
    char op_;
    func_t callback_;
};
const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}
//main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>

using std::cout;
using std::endl;
void* ProductorRoutine(void* args)
{
    RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
    while (true)
    {
        //任务1
        // int data=rand()%10+1;
        // rq->push(data);
        // cout<<"生产完成"<<data<<endl;
        // sleep(1);

        //任务2
        //获取任务
        int x=rand() % 10 ;
        int y=rand() % 5;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        //生产任务
        rq->push(t);
        //输出提示
        cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
    
}
void* ComsumerRoutine(void* args)
{
    RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
    while (true)
    {
        //任务1
        // int data;
        // rq->pop(&data);
        // cout<<"消费完成"<<data<<endl;

        //任务2
        Task t;
        //消费任务
        rq->pop(&t);
        std::string result=t();
        cout<<"生产者派发了一个任务: "<<result<<endl;

    }
    
}
int main()
{
    srand((size_t)time(0)^getpid()^0x22113121);
    RingQueue<Task>* rq=new RingQueue<Task>();
    pthread_t c,p;
    pthread_create(&p,nullptr,ProductorRoutine,rq);
    pthread_create(&c,nullptr,ComsumerRoutine,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    return 0;
}

代码逻辑
main函数创建两个线程,生产者线程负责生产出任务参数并构建任务对象,再把任务添加到RingQueue里面;消费者线程负责将任务从RingQueue里面取出来并调用Task的仿函数计算任务并打印

四、多生产多消费情形

无论是阻塞队列还是环形队列,只要保证最终进入临界区的是一个生产,一个消费就行,所以需要在环形队列对push与pop加锁。
需要加两把锁,一个是生产线程的锁,一个是消费线程的锁,你拿你的,我拿我的

#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>

using std::cout;
using std::endl;

std::string SelfName()
{
    char name[128];
    snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
    return name;
}

void* ProductorRoutine(void* args)
{
    RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
    while (true)
    {
        //任务1
        // int data=rand()%10+1;
        // rq->push(data);
        // cout<<"生产完成"<<data<<endl;
        // sleep(1);

        //任务2
        //获取任务
        int x=rand() % 10 ;
        int y=rand() % 5;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        //生产任务
        rq->push(t);
        //输出提示
        cout<< SelfName()<<",派发了一个任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
    
}
void* ComsumerRoutine(void* args)
{
    RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
    while (true)
    {
        //任务1
        // int data;
        // rq->pop(&data);
        // cout<<"消费完成"<<data<<endl;

        //任务2
        Task t;
        //消费任务
        rq->pop(&t);
        std::string result=t();
        cout<< SelfName()<<",消费了一个任务: "<<result<<endl;

    }
    
}


int main()
{
    srand((size_t)time(0)^getpid()^0x22113121);
    RingQueue<Task>* rq=new RingQueue<Task>();
    //多生产多消费
    pthread_t p[5],c[10];
    for(int i=0;i<5;i++) pthread_create(p+i,nullptr,ProductorRoutine,rq);
    for(int i=0;i<10;i++) pthread_create(c+i,nullptr,ComsumerRoutine,rq);
    
    
    for(int i=0;i<5;i++) pthread_join(p[i],nullptr);
    for(int i=0;i<10;i++) pthread_join(c[i],nullptr);

    delete rq;

    return 0;
}
#pragma once

#include <iostream>
#include <cassert>
#include <vector>
#include <semaphore.h>

static const int gcap=5;

template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        int n=sem_wait(&sem);
        assert(n==0);
        (void)n;
    }   
    void V(sem_t& sem)
    {
        int n=sem_post(&sem);
        assert(n==0);
        (void)n;
    } 
public:
    RingQueue(const int& cap=gcap)
    :queue_(cap),cap_(cap)
    {
        //初始化信号量
        int n=sem_init(&spacesem_,0,cap_);
        assert(n==0);
        n=sem_init(&datasem_,0,0);
        assert(n==0);

        pStep_=cStep_=0;
        
        pthread_mutex_init(&pmutex_,nullptr);
        pthread_mutex_init(&cmutex_,nullptr);
    }
    //生产者
    void push(const T& in)
    {
        P(spacesem_);
        //注意!把锁加在信号量里面
        pthread_mutex_lock(&pmutex_);
        queue_[pStep_++]=in;
        pStep_%=cap_;//避免越界
        pthread_mutex_unlock(&pmutex_);
        V(datasem_);
    }

    //消费者
    void pop(T* out)
    {
        P(datasem_);

        pthread_mutex_lock(&cmutex_);
        *out=queue_[cStep_++];
        cStep_%=cap_;
        pthread_mutex_unlock(&cmutex_);

        V(spacesem_);
       
    }

    ~RingQueue()
    {
        sem_destroy(&spacesem_);
        sem_destroy(&datasem_);

        pthread_mutex_destroy(&pmutex_);
        pthread_mutex_destroy(&cmutex_);

    }
private:
    std::vector<T> queue_;
    int cap_;
    sem_t datasem_;//消费者--数据资源
    sem_t spacesem_;//生产者--空间资源
    int pStep_;//生产者数组下标
    int cStep_;//消费者数组下标

    pthread_mutex_t pmutex_;
    pthread_mutex_t cmutex_;

};

在这里插入图片描述
关于加锁问题,可以在信号量之前加锁也可以再其之后加锁,在其之前加锁就导致线程申请不到锁只能等待,而在其之后加锁可以保证线程先去申请信号量,如果申请失败就在信号量处阻塞,信号量申请成功再去申请锁,提高效率。

五、小结

5.1 多生产多消费的意义

上述多线程代码中,最少有一个线程进入队列,最多有两个线程进入,那么多生产多消费的意义在哪?

不管是环形队列还是阻塞队列,多生产多消费的意义在于获取任务、处理任务是要花时间的,不仅仅是放进去、拿出来就行了。所以多生产多消费的时候的意义在于可以在生产之前与消费之后让线程并行执行。

5.2 条件变量与信号量

条件变量是一种用于线程同步和通信的高级机制。条件变量需要和互斥量一起使用,来保证线程同步和避免竞态条件。

信号量是一种用于多线程间同步的基本机制。它是一个计数器,用于控制多个线程对共享资源的访问。

如果想让一个线程完整的访问共享资源,可以使用条件变量。但是如果需要多个线程并发访问共享资源的不同区域,则可以使用信号量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/484556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker容器技术

什么是docker Docker 使用 Google 公司推出的 Go 语言 进行开发实现&#xff0c;基于 Linux 内核的 cgroup&#xff0c;namespace&#xff0c;以及 OverlayFS 类的 Union FS 等技术&#xff0c;对进程进行封装隔离&#xff0c;属于 操作系统层面的虚拟化技术。由于隔离的进程独…

PCL学习三:KD-Tree Octree

参考引用 Point Cloud Library黑马机器人 | PCL-3D点云【量化课堂】KD-Tree系列KD-Tree原理详解 1. 引言 通过激光雷达或双目相机获取到的点云&#xff0c;一般数据量较大且分布不均匀&#xff0c;数据主要表征了目标物表面的大量点的集合&#xff0c;这些离散的点如果希望实现…

七彩虹DDR5主板四根开启XMP教程

七彩虹的D5主板&#xff0c;通常在安装8G2、16G2的情况下是可以正常开启XMP的&#xff0c; 但在16G4、32G2的情况下机会出现XMP超频失败或蓝屏问题 这是由于BIOS优化不足造成的 第一步还是开启XMP&#xff0c;也可以在选择XMP后&#xff0c;切换到用户自定义模式&#xff0c…

Flutter 与第三方 Native-SDK 的交互代理方案

场景 在使用 Flutter 进行功能模块或者整体项目的开发时&#xff0c;如果需要&#xff08;阶段性&#xff09;频繁地和某个第三方 Native-SDK 进行交互&#xff0c;而该 Native-SDK 没有实现 Flutter 插件版本的情况下&#xff0c;如果直接把这部分交互 API 加入到原有的 chan…

最优化方法Python计算:一元函数搜索算法——二分法

设一元目标函数 f ( x ) f(x) f(x)在区间 [ a 0 , b 0 ] ⊆ R [a_0,b_0]\subseteq\text{R} [a0​,b0​]⊆R&#xff08;其长度记为 λ \lambda λ&#xff09;上为单峰函数&#xff0c;且在 ( a 0 , b 0 ) (a_0,b_0) (a0​,b0​)内连续可导&#xff0c;即其导函数 f ′ ( x ) f…

TinyJAMBU的制动原理——一种轻量化的认证密码

关于TinyJAMBU的定义和介绍在另一篇博文已经介绍过了&#xff0c;这里只对其动作原理进行描述和说明。 对应的博文链接如下&#xff1a;TinyJAMBU&#xff1a;一种轻量化密码介绍 首先&#xff0c;该密码是一个流密码体系的块密码框架。其加密模式整体上来看是块密码&#xff0…

P4903 心碎

这是一道洛谷里的题目 难度级别&#xff1a;普及/提高- 题目提供者&#xff1a;大神cyd 题目背景 NOIP2015初赛。CYD大神在他的母校跪掉了。初赛那天&#xff0c;他回到原来的教室参观了一下&#xff0c;发现那张遍布了N个洞的课桌还摆在那里——那是他初中时和XHY同学坐过…

优思学院|8D和DMAIC两种方法应如何选择?

在现代的商业环境中&#xff0c;客户投诉是一个非常常见的问题。当客户不满意产品或服务时&#xff0c;他们往往会向企业发出投诉。质量管理部门是一个负责处理这些投诉的重要部门&#xff0c;因为它们需要确保产品和服务的质量满足客户的期望。改善方法是质量管理部门用来解决…

作为一名程序员,如何写出一手让同事膜拜的漂亮代码?

整洁的代码 有意义的命名 函数命名 变量命名 函数的定义 注释的规范 代码的长度 代码的对齐 我写代码已经有好几年了&#xff0c;最近看了一本书叫做《代码整洁之道》。我发现这本书中介绍的一些内容对我来说非常有启发性。书中提到的一些方法和技巧让我重新审视了自己的…

疑难问题定位案例复盘(三)

今天我们分享一个数据库被异常改写的案例&#xff0c;通过该案例我们可以学习总结出常规的文件被改写问题定位思路。 问题现象 1、测试环境在进行特定压力测试时发现页面登陆异常&#xff0c;且调试日志多个进程持续打印“数据库打开失败”日志。 2、测试环境在进行多个压力测…

Unity教程||Unity添加中文字体||Unity知识记录--制作UI粒子特效

Unity添加中文字体 ## 1、拷贝字体文件 拷贝C:\Windows\Fonts文件夹下&#xff0c;华文细黑常规文件到项目中 ## 2、下载中文字库 链接: https://pan.baidu.com/s/1KW31KB5vEImZHUWvQ9PLEQ 提取码: bgug 3、添加字体字库 选择Window->TextMeshPro->Font Asset Crea…

HTML的基础语法

文章目录 前言一.HTML结构1.1 什么是html语言1.2 html的结构是什么 二.HTML常见标签2.1 文本格式标签标题标签段落标签换行标签格式化标签图片标签超链接标签 2.2 表格标签2.3 表单标签from标签input标签文本框密码框单选框复选框普通按钮提交按钮清空按钮选择文件select标签te…

延迟队列与SpringBoot实战

延迟队列与SpringBoot实战 概念 延时队列,队列内部是有序的&#xff0c;最重要的特性就体现在它的延时属性上&#xff0c;延时队列中的元素是希望在指定时间到了以后或之前取出和处理&#xff0c;简单来说&#xff0c;延时队列就是用来存放需要在指定时间被处理的元素的队列 …

基于jQuery------购物车案例

目录 基于jQuery------购物车案例 案例&#xff1a;购物车案例模块-增减商品数量分析 案例&#xff1a;购物车案例模块-修改商品小计分析 案例&#xff1a;购物车案例模块-计算总计和总额 案例&#xff1a;购物车案例模块-删除商品模块 案例&#xff1a;购物车案例模块-选…

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

从’discover.partitions’true’分析Hive的TBLPROPERTIES 前言 Hive3.1.2先建表&#xff1a; show databases ;use db_lzy;show tables ;create external table if not exists test_external_20230502(id int,comment1 string,comment2 string ) stored as parquet ;creat…

C语言通过控制台命令行传入参数

Linux 与 windows运行c语言程序 切换到对应目录下 1. gcc hello.c -o hello 2.Linux: ./hello Windows: hello.exe int main(){}默认无参数 但在一些情况下想要直接通过在上述过程中第二步就传入参数而不是使用scanf..之类的输入语句就需要使用有参数的main方法: int main() {…

Docker--harbor私有库部署与管理

目录 一、本地私有仓库 搭建本地私有仓库 Docker容器的重启策略 二、Harbor 1、什么是Harbor 2、Harbor特性 3、Harbor的构成 三、Harbor部署 实验步骤 1、安装Docker-Compose服务 2、部署Harbor服务 1、下载或上传Harbor安装程序 2、修改Harbor安装的配置文件 3、…

基于TI板MSP430 玩转PID

文章目录 前言一、整体框架二、PID算法1. 位置式PID2. 增量式PID3. 比例外置式PID4. 积分限幅、输出限幅和PID参数整定5. 位置式PID和增量式PID的区别及抉择 三、初值获取1. 定时器输入捕获2. 外部中断3. ADC采样 前言 具体啥是PID&#xff0c;我这里不做介绍&#xff0c;网上…

SpringMVC(后)SSM整合

10、文件上传和下载 10.1、文件下载 ResponseEntity用于控制器方法的返回值类型&#xff0c;该控制器方法的返回值就是响应到浏览器的响应报文 使用ResponseEntity实现下载文件的功能 RequestMapping("/testDown") public ResponseEntity<byte[]> testResp…

【Hello Algorithm】复杂度 二分法

作者&#xff1a;小萌新 专栏&#xff1a;算法 作者简介&#xff1a;大二学生 希望能和大家一起进步 本篇博客简介&#xff1a;介绍算法的复杂度 对数器和二分法 复杂度 对数器 二分法 复杂度常数时间操作非常数时间操作时间复杂度空间复杂度 二分法有序数组中找一个值寻找有序…