【Linux】基于环形队列RingQueue的生产消费者模型

news2025/1/12 2:48:25

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

目录

前言

环形队列的概念及定义

POSIX信号量

RingQueue的实现方式

RingQueue.hpp的构建

Thread.hpp

Main.cc主函数的编写

Task.hpp function包装器的使用

总结



前言

世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!


提示:以下是本篇文章正文内容,下面案例可供参考

环形队列的概念及定义

  • 环形队列采用数组模拟,用模运算来模拟环状特性

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来 判断满或者空。另外也可以预留一个空的位置,作为满的状态

  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

为什么判断阻塞队列为空为满时,要在我们对应的加锁和解锁之间呢?

  • 判断阻塞队列是否为满,本身就是对阻塞队列内部的成员属性做访问、做比较,如果判断在加锁和解锁的区间之外判断时,可能会出现pop和push的操作,会导致并发访问出问题,判断也就不准了。加锁是对内部资源(阻塞队列)进行整体使用的,虽然对阻塞队列保护起来了,但是对阻塞队列的使用情况,我们在锁这里看不出来,我们只能证明当前阻塞队列有人想要使用它,但是队列中的情况是不清楚的。

当队列为空的时候,生产者和消费者的下标是同一个位置;

当队列为满的时候,生产者和消费者下标是用同一个位置。

环形队列判空盘满的时候,难度有点大,因为生产者和消费者在队列为空为满时,都是用同一个位置,所以

  1. 方法一:在环形队列中引入了一个元素的计数器count,count==0时,为空;count==N时,为满;
  2. 方法二:当生产者下标加1等于消费者下标,则说明队列为满。

环形队列中共分三种情况:

生产者和消费者下标为同一个位置:

  • 队列满:当队列为满时,生产者就不能在盘子中放入数据了,否则会覆盖之前的数据;所以要访问临界资源,要让消费者先跑。
  • 队列空:当队列为空时,必须保证要生产者先跑,因为生产者和消费者为空,指向同一个位置,那么这个位置就是一个局部性的临界资源,不能让两者同时跑,否则会出现二义性(盘子和苹果),所以要保证两者互斥,其次必须生产者先跑。

通过队列为空为满这两种情况得出结论:生产者不能把消费者套一个圈;消费者不能超过生产者。

生产者和消费者的下标不是同一个位置:

  • 队列一定不为空&&队列一定不为满

生产者和消费者的下标不在同一个位置,就意味着生产者和消费者的动作,可以真正的并发。

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

我们在电影院中买票,把票买到了,(申请信号量成功了)能证明电影院中一定有资源给我。所以当我申请信号量成功之后,我根本就不用判断这里面的资源是否满足我的条件,所以信号量是一把计数器,这个计数器是用来衡量资源数目的,只要申请成功,就一定会有对应的资源提供给你,从而有效减少内部的判断!!!

在进入临界资源之前,申请信号量时,就已经知道要的资源有还是没有了;而阻塞队列加锁那里,我们还要在对应的临界区里做判断。

定义一个信号量跟定义一个整型一样。

理解:信号量内部维护了一个计数器和队列

初始化信号量

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数: 
 pshared:值为0表示线程间共享,非零表示进程间共享 
 value:信号量初始值 

销毁信号量

int sem_destroy(sem_t *sem); 

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem); //P() 
等待信号量:对指定信号量的计数器做--,
减之前先判断:信号量值是否大于0,大于0,继续往后走;小于0,则阻塞

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);//V() 
发布信号量:对指定信号量的计数器做++

RingQueue的实现方式

RingQueue.hpp的构建

将和环形队列相关的控制方法进行封装,通过模板传入Thread模板之中,之后每个线程都能看到环形队列的相关方法及规则,从而更好的对所有的线程进行管理,依旧是遵循Linux中的先描述,再组织。

#pragma once
// 环形队列
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>

// 单生产,单消费
// 多生产,多消费
// "321":
// 3: 三种关系
// a: 生产和消费互斥和同步
// b: 生产者之间:一把锁
// c: 消费者之间:一把锁
// b和c的解决方案:加锁,是因为下标只有一个
// 1. 需要几把锁?2把
// 2. 如何加锁?
template<typename T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        sem_wait(&sem);// -1
    }
    void V(sem_t& sem)
    {
        sem_post(&sem);// +1
    }
    // 加锁和解锁的接口封装
    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap) : _ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0)
    {
        sem_init(&_room_sem, 0, _cap);// 0:线程间共享
        sem_init(&_data_sem, 0, 0);

        pthread_mutex_init(&_productor_mutex, nullptr);
        pthread_mutex_init(&_consumer_mutex, nullptr);
    }
    // 入队列操作
    void Enqueue(const T& in)
    {
        // 先申请信号量再加锁的优点:
        // 多个线程来了,抢信号量都不互相干扰;虽然竞争锁的时候,只有一个线程竞争成功了,
        // 其余线程都在锁这里等待下次竞争锁,但是这些线程都提前预定了信号量,当这些线程被唤醒时,
        // 直接进来生产就行了,效率提高了

        // 生产行为
        P(_room_sem);// 先申请空间资源,申请信号量:本质是对资源的预定机制,是原子的
        Lock(_productor_mutex);// 加锁使得多生产多消费--->转为单生产但消费
        // 一定有空间!!!
        _ring_queue[_productor_step++] = in; // 生产,先访问再++
        _productor_step %= _cap;// 防止越界,维持环状特性
        Unlock(_productor_mutex);
        V(_data_sem);// 释放数据信号量,数据信号量+1
    }
    void Pop(T* out)
    {
        // 消费行为
        P(_data_sem);// 先申请数据资源
        Lock(_consumer_mutex);
        *out = _ring_queue[_consumer_step++];
        _consumer_step %= _cap;
        Unlock(_consumer_mutex);
        V(_room_sem);// 释放空间信号量:数据取走,空间露出来,把空间资源释放掉
    }
    ~RingQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_productor_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
    }
private:
    // 1. 环形队列
    std::vector<T> _ring_queue;
    int _cap; // 环形队列的容量上限

    // 2. 生产和消费的下标
    int _productor_step;// 多个生产者线程都要竞争这个下标,下标只有一个,所以得争锁
    int _consumer_step;

    // 3. 定义信号量
    // 当队列为空为满时,生产和消费线程都在各自的信号量中的队列中休眠
    sem_t _room_sem; // 生产者关心(空间信号量)
    sem_t _data_sem; // 消费者关心(数据信号量)

    // 4. 定义锁,维护多生产之间、多消费之间的互斥关系
    pthread_mutex_t _productor_mutex;
    pthread_mutex_t _consumer_mutex;
};

Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&, std::string name)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            // std::cout << _threadname << std::endl;
            // 回调线程方法(生产者和消费者执行的函数)
            _func(_data, _threadname);
        }
    public:
        Thread(func_t<T> func, T& data, std::string name = "none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void* threadroutine(void* args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T>* self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if (!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if (!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if (!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T& _data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} // namespace ThreadModule

#endif

Main.cc主函数的编写

#define _CRT_SECURE_NO_WARNINGS 1

#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>

// 我们需要的是向队列中投递任务
using namespace ThreadModule;
using ringqueue_t = RingQueue<Task>;

void Consumer(ringqueue_t& rq, std::string name)
{
    while (true)
    {
        sleep(2);
        // 1. 消费任务
        Task t;
        rq.Pop(&t);
        std::cout << "Consumer handler task: " << "[" << name << "]" << std::endl;
        // 2. 处理任务
        t();
    }
}

void Productor(ringqueue_t& rq, std::string name)
{
    srand(time(nullptr) ^ pthread_self());
    //int cnt = 10;
    while (true)
    {
        // 获取任务
        // 生产任务
        rq.Enqueue(Download);
        std::cout << "Productor : " << "[" << name << "]" << std::endl;
        // cnt--;
    }
}

void InitComm(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq, func_t<ringqueue_t> func, const std::string& who)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1) + "-" + who;
        threads->emplace_back(func, rq, name);// 构建出一个线程对象
        // threads->back()->Start();// 真正的创建出新线程
        // 当前线程对象在创建新线程和执行函数方法时,主线程可能会先一步回来又创建了一个线程对象,
        // 那么vector容器中最后一个元素就改变了,那么又执行容器的最后一个线程可能会出错,
        // 因为上一个线程的执行函数的过程还没有执行完,刚拿到最后一个线程的数据时,还没来得及使用,
        // 容器中最后一个线程变化了,那么就拿新线程的数据,但是新线程的数据并没有初始化完成,
        // 此时访问的对象那个就是一个空对象。
        // 所以我们应该把线程构建起来,最后统一启动StartAll
    }
}

void InitConsumer(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{
    InitComm(threads, num, rq, Consumer, "consumer");
}

void InitProductor(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{
    InitComm(threads, num, rq, Productor, "productor");
}

void WaitAllThread(std::vector<Thread<ringqueue_t>>& threads)
{
    for (auto& thread : threads)
    {
        thread.Join();
    }
}


void StartAll(std::vector<Thread<ringqueue_t>>& threads)
{
    for (auto& thread : threads)
    {
        std::cout << "start: " << thread.name() << std::endl;
        thread.Start();
    }
}

int main()
{
    ringqueue_t* rq = new ringqueue_t(10);
    std::vector<Thread<ringqueue_t>> threads;
    // std::vector<Thread<ThreadData>> threads;

    InitProductor(&threads, 1, *rq);
    InitConsumer(&threads, 1, *rq);

    StartAll(threads);

    WaitAllThread(threads);

    return 0;
}

Task.hpp function包装器的使用

Task是一个function<void()>的类型,也就是说用Task实例化出的模板可以接收任意类型的函数方法(也就是生产消费者模型中的任务)这样就最大的实现了来什么执行什么,大大提高了代码的灵活性可拓展性。

#pragma once
#include <iostream>
#include <functional>

using Task = std::function<void()>;

void Download()
{
    std::cout << "this is a download task" << std::endl;
}

总结

好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1926507.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Torch-Pruning 库入门级使用介绍

项目地址&#xff1a;https://github.com/VainF/Torch-Pruning Torch-Pruning 是一个专用于torch的模型剪枝库&#xff0c;其基于DepGraph 技术分析出模型layer中的依赖关系。DepGraph 与现有的修剪方法&#xff08;如 Magnitude Pruning 或 Taylor Pruning&#xff09;相结合…

Python:setattr()函数和__setattr__()魔术方法

相关阅读 Pythonhttps://blog.csdn.net/weixin_45791458/category_12403403.html?spm1001.2014.3001.5482 setattr()是一个python内置的函数&#xff0c;用于设置一个对象的属性值&#xff0c;一般情况下&#xff0c;可以通过点运算符(.)完成相同的功能&#xff0c;但是getat…

[激光原理与应用-112]:南京科耐激光-激光焊接-焊中检测-智能制程监测系统IPM介绍 - 16 - 常见的产品指标

目录 一、光学传感器技术指标&#xff1a;实时信号采集与信号处理 &#xff08;1&#xff09;适用激光器的功率范围宽&#xff1a; &#xff08;2&#xff09;感光范围&#xff1a;350nm~1750nm&#xff1a;从可见光到红外光 &#xff08;3&#xff09;信号类型&#xff1a…

NSSCTF_RE(二)暑期

[CISCN 2021初赛]babybc LLVM是那个控制流平坦化混淆&#xff0c;bc是IR指令文件 得到64位elf文件 然后就慢慢分析&#xff0c;感觉太妙了我靠 一个数独游戏&#xff0c;用二个二维数组添加约束&#xff0c;一个二维数组作地图&#xff0c;慢慢看 最后用 z3 来解数独&#xf…

k8s快速部署一个网站

1&#xff09;使用Deployment控制器部署镜像&#xff1a; kubectl create deployment web-demo --imagelizhenliang/web-demo:v1 kubectl get deployment,pods[rootk8s-matser ~]# kubectl get pods NAME READY STATUS RESTARTS A…

25届平安产险校招测评IQ新16PF攻略:全面解析与应试策略

尊敬的读者&#xff0c;您好。随着平安产险校招季的到来&#xff0c;许多应届毕业生正积极准备着各项测评。本文旨在提供一份详尽的测评攻略&#xff0c;帮助您更好地理解平安产险的校招测评流程&#xff0c;以及如何有效应对。 25届平安产险平安IQ&#xff08;新&#xff09;测…

Java 设计模式系列:外观模式

简介 外观模式&#xff08;Facade Pattern&#xff09;是一种设计模式&#xff0c;又名门面模式&#xff0c;是一种通过为多个复杂的子系统提供一个一致的接口&#xff0c;而使这些子系统更加容易被访问的模式。该模式对外有一个统一接口&#xff0c;外部应用程序不用关心内部…

2024-07-14 Unity插件 Odin Inspector2 —— Essential Attributes

文章目录 1 说明2 重要特性2.1 AssetsOnly / SceneObjectsOnly2.2 CustomValueDrawer2.3 OnValueChanged2.4 DetailedInfoBox2.5 EnableGUI2.6 GUIColor2.7 HideLabel2.8 PropertyOrder2.9 PropertySpace2.10 ReadOnly2.11 Required2.12 RequiredIn&#xff08;*&#xff09;2.…

基于Python thinker GUI界面的股票评论数据及投资者情绪分析设计与实现

1.绪论 1.1背景介绍 Python 的 Tkinter 库提供了创建用户界面的工具&#xff0c;可以用来构建股票评论数据及投资者情绪分析的图形用户界面&#xff08;GUI&#xff09;。通过该界面&#xff0c;用户可以输入股票评论数据&#xff0c;然后通过情感分析等技术对评论进行情绪分析…

昇思25天学习打卡营第14天 | ShuffleNet图像分类

昇思25天学习打卡营第14天 | ShuffleNet图像分类 文章目录 昇思25天学习打卡营第14天 | ShuffleNet图像分类ShuffleNetPointwise Group ConvolutionChannel ShuffleShuffleNet模块网络构建 模型训练与评估数据集训练模型评估模型预测 总结打卡 ShuffleNet ShuffleNetV1是旷世科…

大模型系列3--pytorch dataloader的原理

pytorch dataloader运行原理 1. 背景2. 环境搭建2.1. 安装WSL & vscode2.2. 安装conda & pytorch_gpu环境 & pytorch 2.112.3 命令行验证python环境2.4. vscode启用pytorch_cpu虚拟环境 3. 调试工具3.1. vscode 断点调试3.2. py-spy代码栈探测3.3. gdb attach3.4. …

基于锚框的物体检测过程

说明&#xff1a;基于锚框的物体检测过程&#xff1a;分为单阶段和两阶段 整体步骤&#xff1a; 提供目标候选区域&#xff1a; 锚框提供了一组预定义的候选区域&#xff0c;这些区域可以覆盖各种尺度和长宽比的目标。通过这些锚框&#xff0c;可以在不同的位置和不同的尺度上…

02-Charles的安装与配置

一、Charles的安装 Charles的下载地址&#xff1a;https://www.charlesproxy.com/。 下载之后&#xff0c;傻瓜式安装即可。 二、Charles组件介绍 主导航栏介绍&#xff1a; 请求导航栏介绍&#xff1a; 请求数据栏介绍&#xff1a; 三、Charles代理设置 四、客户端-windows代理…

【Linux】多线程_6

文章目录 九、多线程7. 生产者消费者模型生产者消费者模型的简单代码结果演示 未完待续 九、多线程 7. 生产者消费者模型 生产者消费者模型的简单代码 Makefile&#xff1a; cp:Main.ccg -o $ $^ -stdc11 -lpthread .PHONY:clean clean:rm -f cpThread.hpp&#xff1a; #i…

React学习笔记02-----

一、React简介 想实现页面的局部刷新&#xff0c;而不是整个网页的刷新。AJAXDOM可以实现局部刷新 1.特点 &#xff08;1&#xff09;虚拟DOM 开发者通过React来操作原生DOM&#xff0c;从而构建页面。 React通过虚拟DOM来实现&#xff0c;可以解决DOM的兼容性问题&#x…

NSSCTF_RE(一)暑期

[SWPUCTF 2021 新生赛]简单的逻辑 nss上附件都不对 没看明白怎么玩的 dnspy分析有三个 AchievePoint , game.Player.Bet - 22m; for (int i 0; i < Program.memory.Length; i) { byte[] array Program.memory; int num i; array[num] ^ 34; } Environment.SetEnvironment…

【CICID】GitHub-Actions-SpringBoot项目部署

[TOC] 【CICID】GitHub-Actions-SpringBoot项目部署 0 流程图 1 创建SprinBoot项目 ​ IDEA创建本地项目&#xff0c;然后推送到 Github 1.1 项目结构 1.2 Dockerfile文件 根据自身项目&#xff0c;修改 CMD ["java","-jar","/app/target/Spri…

Scrapy框架实现数据采集的详细步骤

需求描述&#xff1a; 本项目目标是使用Scrapy框架从宁波大学经济学院网站&#xff08;nbufe.edu.cn&#xff09;爬取新闻或公告详情页的内容。具体需求如下&#xff1a; 1、通过遍历多个页面&#xff08;共55页&#xff09;构建翻页URL。 2、使用scrapy自带的xpath从每页的…

STM32智能机器人避障系统教程

目录 引言环境准备智能机器人避障系统基础代码实现&#xff1a;实现智能机器人避障系统 4.1 数据采集模块 4.2 数据处理与控制模块 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景&#xff1a;机器人导航与避障问题解决方案与优化收尾与总结 1. 引言 智能机器人避…

Android ImageDecoder把瘦高/扁平大图相当于fitCenter模式decode成目标小尺寸Bitmap,Kotlin

Android ImageDecoder把瘦高/扁平大图相当于fitCenter模式decode成目标小尺寸Bitmap&#xff0c;Kotlin val sz Size(MainActivity.SIZE, MainActivity.SIZE)val src ImageDecoder.createSource(mContext?.contentResolver!!, uri)val bitmap ImageDecoder.decodeBitmap(sr…