Linux线程:基于环形队列RingQueue的生产消费者模型

news2025/1/12 17:26:07

目录

一、环形队列的概念及定义

二、POSIX信号量

三、RingQueue的实现方式

3.1RingQueue.hpp的构建

3.2Thread.hpp

3.3Main.cc主函数的编写

3.4Task.hpp function包装器的使用


一、环形队列的概念及定义

此处的环形队列并不是一个真正意义上的环,而是通过对容量的取模操作来实现环状,消费者和生产者除了此队列为空或为满,其余情况下生产者和消费者都不会相遇,生产者一定会比消费者先走,因为初始时队列为空,消费者没有消费对象,所以一定是生产者进行生产。

此时环形队列遵循两个原则:

1.生产者不能将消费者套一个圈。

2.消费者不能超过生产者。

所以当consumer和productor处在同一位置时,只可能是以下两种情况:队列为空或者队列为满,

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

二、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于 线程间同步。  

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数: 
 pshared:值为0表示线程间共享,非零表示进程间共享 
 value:信号量初始值 

我们可以将其理解为一个计数器,value是初始值,对sem做初始化。

销毁信号量

int sem_destroy(sem_t *sem); 

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem); //P() 

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);//V() 

三、RingQueue的实现方式

3.1RingQueue.hpp的构建

将和环形队列相关的控制方法进行封装,通过模板传入Thread模板之中,之后每个线程都能看到环形队列的相关方法及规则,从而更好的对所有的线程进行管理,依旧是遵循Linux中的先描述,再组织。

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>    
#include <semaphore.h>

// 单生产,单消费
// 多生产,多消费
// "321":
// 3: 三种关系
// a: 生产和消费互斥和同步
// b: 生产者之间:
// c: 消费者之间:
// 解决方案:加锁
// 1. 需要几把锁?2把
//一把锁维护消费者和消费者之间的关系,另一把维护生产者和生产者之间的关系
//而生产者和消费者之间的关系则可以通过信号量来进行协调
template<typename T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        //等待信号量,将该信号量-1
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        //发布信号量,将该信号量+1
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap):
    _ring_queue(cap),
    _cap(cap),
    _productor_step(0),
    _consumer_step(0)
    {
        sem_init(&_room_sem,0,_cap);//刚开始生产者可生产空间为_cap
        sem_init(&_data_sem,0,0);


        pthread_mutex_init(&_productor_mutex,nullptr);
        pthread_mutex_init(&_consumer_mutex,nullptr);
    }

    //生产
    void Enqueue(const T &in)
    {
        //生产行为
        P(_room_sem);//生产者可用空间--
        Lock(_productor_mutex);

        //一定有空间
        _ring_queue[_productor_step++]=in;
        _productor_step%=_cap;
        Unlock(_productor_mutex);
        V(_data_sem);//消费者可消费data++,去通知此时在_data_sem等待的消费者,信号量不为0就会进行唤醒
    }

    //消费
    void Pop(T* out)
    {
        //消费行为
        P(_data_sem);//在竞争锁之前先申请信号量,
        Lock(_consumer_mutex);
        *out=_ring_queue[_consumer_step++];//拿到队列中的任务
        _consumer_step %= _cap;  
        Unlock(_consumer_mutex);
        V(_room_sem);

    }
    ~RingQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_productor_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
    }
private:
    //1.构造环形队列
    std::vector<T> _ring_queue;
    int _cap;//环形队列的容量上限

    //2.生产消费者的下标
    int _productor_step;
    int _consumer_step;

    //3.定义信号量
    sem_t _room_sem;//生产者关心
    sem_t _data_sem;//消费者关心

    //4.定义锁 维护多生产多消费之间的互斥关系
    pthread_mutex_t _productor_mutex;
    pthread_mutex_t _consumer_mutex;
};

3.2Thread.hpp

Thread.hpp的详细实现方式已经在之前的博客中做过详细解读和解析。

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t=std::function<void(T&,std::string name)>;

    template<typename T>
    class Thread
    {
    public:
    void Excute()
    {
        _func(_data,_threadname);
    }
    public:
    Thread(func_t<T> func,T&data,const std::string& name="none-name")
    :_func(func),_data(data),_threadname(name),_stop(true)
    {}
    static void* threadroutine(void* args)//static成员函数没有this
    {
        Thread<T> *self = static_cast<Thread<T> *>(args);
        self->Excute();
        return nullptr;
    }
    bool Start()
    {
        int n=pthread_create(&_tid,nullptr,threadroutine,this);//把this传给threadroutine让其完成调用
        if(!n)
        {
            _stop=false;
            return true;
        }
        else
        {
            return false;
        }
    }
    void Detach()
    {
        if(!_stop)
        {
            pthread_detach(_tid);
        }
    }
    void Join()
    {
        if(!_stop)
        {
            pthread_join(_tid,nullptr);
        }
    }
    std::string name()
    {
        return _threadname;
    }
    void Stop()
    {
        _stop = true;
    }
    ~Thread()
    {}
    private:
    pthread_t _tid;
    std::string _threadname;
    T& _data;//要传入所执行函数的参数
    func_t<T> _func;//线程要执行的函数
    bool _stop;
    };
}
#endif

3.3Main.cc主函数的编写

在main函数中,使用了三层封装,更加清晰的梳理了环形队列的使用过程和对生产消费者两方的控制,以及对ringqueue.hpp和thread.hpp的调用和联动。

#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>


//我们需要的是向队列中投递任务
using namespace ThreadModule;
using ringqueue_t=RingQueue<Task>;


void Consumer(ringqueue_t &rq,std::string name)
{
    while(true)
    {
        sleep(2);
        //1 消费任务
        Task t;
        rq.Pop(&t);
        //t拿到队列中的任务
        std::cout<<"Consumer handler task: "<<"["<<name<<"]"<<std::endl;
        //2处理任务
        t();
    }
}

void Productor(ringqueue_t &rq,std::string name)
{
    //srand(time(nullptr)^pthread_self());

    while(true)
    {
        rq.Enqueue(Download);
        std::cout<<"Productor: "<<"["<<name<<"]"<<std::endl;
    }
}

void InitComm(std::vector<Thread<ringqueue_t>> *threads,int num,ringqueue_t &rq,func_t<ringqueue_t> func,const std::string &who)
{
    for(int i=0;i<num;i++)
    {
        std::string name="thread-"+std::to_string(i+1)+"-"+who;
        threads->emplace_back(func,rq,name);
        //threads->back()->Start();
        //为什么不直接start:
        //1.直接start会转到thread中去调用pthread_create
    }
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
    InitComm(threads, num, rq, Consumer, "consumer");
}

void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
    InitComm(threads, num, rq, Productor, "productor");
}

void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{
    for(auto &thread:threads)
    {
        thread.Join();
    }
}
void StartAll(std::vector<Thread<ringqueue_t>> &threads)
{
    for(auto &thread:threads)
    {
        std::cout<<"start: "<<thread.name()<<std::endl;
        thread.Start();
    }
}
int main()
{
    ringqueue_t *rq=new ringqueue_t(10);
    std::vector<Thread<ringqueue_t>> threads;
    
    InitProductor(&threads,1,*rq);
    InitConsumer(&threads,1,*rq);

    StartAll(threads);

    WaitAllThread(threads);
    return 0;
}

3.4Task.hpp function包装器的使用

Task是一个function<void()>的类型,也就是说用Task实例化出的模板可以接收任意类型的函数方法(也就是生产消费者模型中的任务)这样就最大的实现了来什么执行什么,大大提高了代码的灵活性可拓展性。

#pragma

#include <iostream>
#include <functional>

using Task=std::function<void()>;


void Download()
{
    std::cout<<"this is adownload task"<<std::endl;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1862664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何集成CppCheck到visual studio中

1.CPPCheck安装 在Cppcheck官方网站下载最新版本1.70&#xff0c;官网链接&#xff1a;http://cppcheck.sourceforge.net/ 安装Cppcheck 2.集成步骤 打开VS&#xff0c;菜单栏工具->外部工具->添加&#xff0c;按照下图设置&#xff0c;记得勾选“使用输出窗口” 2.…

AcWing算法基础课笔记——状态压缩DP:蒙德里安的梦想

状态压缩DP 状态是整数&#xff0c;但把它看成二进制数&#xff0c;二进制中每一位是0或1表示不同的情况。 蒙德里安的梦想 291. 蒙德里安的梦想 - AcWing题库 题目 求把 NM&#x1d441;&#x1d440; 的棋盘分割成若干个 1212 的长方形&#xff0c;有多少种方案。 例如…

算法训练营day19--530.二叉搜索树的最小绝对差+501.二叉搜索树中的众数+236. 二叉树的最近公共祖先

一、530.二叉搜索树的最小绝对差 题目链接&#xff1a;https://leetcode.cn/problems/minimum-absolute-difference-in-bst/ 文章讲解&#xff1a;https://programmercarl.com/0530.%E4%BA%8C%E5%8F%89%E6%90%9C%E7%B4%A2%E6%A0%91%E7%9A%84%E6%9C%80%E5%B0%8F%E7%BB%9D%E5%AF…

Lobe Chat openai claude

claude-3-5-sonnet-20240620 $ docker run -d -p 3210:3210 \-e OPENAI_API_KEYsk-xxxx \-e OPENAI_PROXY_URLhttps://api-proxy.com/v1 \-e ACCESS_CODElobe66 \--name lobe-chat \lobehub/lobe-chatDocker 部署 更新 docker ps CONTAINER ID IMAGE …

【Playwright+Python】手把手带你写一个自动化测试脚本

如何使用代理方式打开网页 在 playwright.chromium.launch() 中传入 proxy 参数即可&#xff0c;示例代码如下&#xff1a; 1、同步写法&#xff1a; from playwright.sync_api import sync_playwrightproxy {server: http:/127.0.0.1:8080}def run():with sync_playwright(…

18.枚举

学习知识&#xff1a;枚举类型、相关的使用方法 Main.java&#xff1a; public class Main {public static void main(String[] args) {myenum[] colorlist myenum.values();//获取枚举中所有对象的引用数组for (myenum one : colorlist){System.out.println(one.toString(…

fork 是一个创建新进程的系统调用

在计算机科学中&#xff0c;fork 是一个创建新进程的系统调用。具体来说&#xff0c;fork 调用会创建一个与当前进程几乎完全相同的副本&#xff0c;包括父进程的内存布局、环境变量、打开的文件描述符等。这个新的进程被称为子进程&#xff0c;而原始进程被称为父进程。 以下…

汇聚荣做拼多多电商怎么运营?

在探讨如何有效运营拼多多电商平台时&#xff0c;我们需要首先明确一个核心问题&#xff1a;如何在竞争激烈的市场中突出重围&#xff0c;吸引并留住消费者。接下来&#xff0c;我们将从五个方面深入剖析这一核心内容&#xff0c;确保每一步都精准有效&#xff0c;以白话文的形…

ATFX汇市:加拿大5月CPI数据来袭,USDCAD逼近关键点位

ATFX汇市&#xff1a;今日20:30&#xff0c;加拿大统计局将公布5月未季调CPI年率&#xff0c;前值为2.7%&#xff0c;预期值2.6%&#xff0c;预期将下降0.1个百分点&#xff1b;同一时间公布的还有加拿大5月核心CPI年率&#xff0c;前值为1.6%&#xff0c;低于2%的温和通胀标准…

【Chapter7】虚拟存储系统,计算机操作系统教程,第四版,左万利,王英

文章目录 [toc]零、前言一、外存资源管理1.1 外存空间划分1.2 外存空间分配1.2.1 空闲块链(慢)1.2.2 空闲块表(UNIX)1.2.3 字位映像图 1.3 进程与外存对应关系 二、虚拟页式存储系统2.1 基本原理2.2 内存页框分配策略2.3 外存块的分配策略2.4 页面调入时机2.5 置换算法2.5.1 最…

Transformer的诞生和崛起

智能问答与文本生成&#xff1a;Transformer模型的超能力 ©作者|wy 来源|神州问学 一、引言 NLP&#xff08;自然语言处理&#xff09;作为人工智能领域的一个重要分支&#xff0c;致力于使计算机能够理解和处理人类语言。随着互联网的发展和信息时代的到来&#xff0c;…

docker 容器设置中文环境

1.容器中安装和设置 1.1.进入容器查看已有语言包 locale -a 默认情况下&#xff1a; 1.2 安装中文语言环境 如果没有zh_CN.utf8就安装。 方式1&#xff1a; #直接安装中文语言包 apt-get install -y language-pack-zh-hans 方式2&#xff1a; #安装中文语言环境 apt-g…

系统初始化进程与文件、systemd概述、单元类型、切换运行级别、查看系统默认默认运行、永久切换、常见的系统服务(centos)

init进程 init进程是Linux系统&#xff08;“/sbin/init”&#xff09;中的第一个进程&#xff0c;它是所有其他进程的祖先进程。init进程的进程号&#xff08;PID&#xff09;始终为1。它负责启动和停止系统中的所有其他进程&#xff0c;以及处理系统的各种系统级任务。 ini…

大模型系列之被我忽视的Assistants API

前言 在这篇文章中&#xff0c;我们提到在GPT4.0 turbo发布时&#xff0c;GPTs和Assistants API的出现使得众多创业者一夜无眠。当时看完之后就被我丢到一边&#xff0c;并没有太多关注&#xff0c;随着我们对RAG和Agent的不断深入了解&#xff0c;蓦然回首&#xff0c;越发感…

HarmonyOS Next开发学习手册——通过startAbility拉起文件处理类应用

使用场景 开发者可以通过调用startAbility接口&#xff0c;由系统从已安装的应用中寻找符合要求的应用来实现打开特定文件的意图&#xff0c;例如&#xff1a;浏览器下应用下载PDF文件&#xff0c;可以调用此接口选择文件处理应用打开此PDF文件。开发者需要在请求中设置待打开…

Redis-实战篇-什么是缓存-添加redis缓存

文章目录 1、什么是缓存2、添加商户缓存3、前端接口4、ShopController.java5、ShopServiceImpl.java6、RedisConstants.java7、查看Redis Desktop Manager 1、什么是缓存 缓存就是数据交换的缓冲区&#xff08;称为Cache&#xff09;&#xff0c;是存贮数据的临时地方&#xff…

Linux 异步 I/O 框架 io_uring:基本原理、程序示例与性能压测

Linux 异步 I/O 框架 io_uring 前言Linux I/O 系统调用演进io_uring与 Linux AIO 的不同原理及核心数据结构&#xff1a;SQ/CQ/SQE/CQE带来的好处三种工作模式io_uring 系统调用 API 前言 io_uring 是 2019 年 Linux 5.1 内核首次引入的高性能 异步 I/O 框架&#xff0c;能显著…

【干货】Jupyter Lab操作文档

Jupyter Lab操作文档1. 使用须知2. 定制化Jupyter设置主题显示代码行数设置语言更多设置 3. 认识Jupyter界面4. 初用Jupyter运行调试格式化查看源码 5. 使用Jupyter Terminal6. 使用Jupyter Markdown7. 上传下载文件&#xff08;云服务器中的Jupyter Lab&#xff09;上传文件到…

SAP的RFID

射频识别 &#xff08;RFID&#xff09; 避免了条码扫描的局限性&#xff0c;条码扫描需要对每个条码进行视线访问&#xff0c;并且一次只能用于扫描一个项目。 一次扫描一个标签可能会令人厌烦和压力大&#xff0c;这会增加人为错误的机会。相反&#xff0c;RFID 标签不需要直…