【Linux取经路】基于信号量和环形队列的生产消费者模型

news2024/11/24 17:53:39

文章目录

  • 一、POSIX 信号量
  • 二、POSIX 信号量的接口
    • 2.1 sem_init——初始化信号量
    • 2.2 sem_destroy——销毁信号量
    • 2.3 sem_wait——等待信号量
    • 2.4 sem_post——发布信号量
  • 三、基于环形队列的生产消费者模型
    • 3.1 单生产单消费模型
    • 3.2 多生产多消费模型
    • 3.3 基于任务的多生产多消费模型
  • 四、结语

在这里插入图片描述

一、POSIX 信号量

共享资源也可以被看成多份,只要规定好每个线程的访问区域即可,此时就可以让多线程去并发的访问临界资源。

POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但 POSIX 可以用于线程间同步。信号量本质是一把计数器,用来描述可用资源数目的,申请信号量时,其实就已经在间接的做判断,看资源是否就绪了,只要申请到信号量,那么说明资源一定是就绪的。

信号量只能保证,不让多余的线程来访问共享资源,即,当前共享资源有十份,信号量不会允许同时有十一个线程来访问临界资源。但是具体的资源分配是通过程序员编码去实现的。如果出现一个共享资源同时被两个线程访问,就属于程序员的编码 Bug。

二、POSIX 信号量的接口

2.1 sem_init——初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
  • sem:要初始化的信号量

  • pshared:0表示线程间共享,非0表示进程间共享。

  • value:信号量初始值

2.2 sem_destroy——销毁信号量

int sem_destroy(sem_t *sem);

2.3 sem_wait——等待信号量

int sem_wait(sem_t *sem); //P()
  • 功能:会将信号量的值减1

2.4 sem_post——发布信号量

int sem_post(sem_t *sem);//V() 
  • 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1

三、基于环形队列的生产消费者模型

只要生产和消费不访问同一个格子,那么生产和消费就可以同时进行。那生产和消费什么时候会指向同一个数据呢?答案是队列为空和为满的时候。

image-20240315221954915

image-20240315222028878

基于环形队列的生产消费者模型必须遵守以下三个原则:

  • 当生产和消费指向同一个资源的时候,只能一个人访问。为空的时候,由生产者去访问;为满的时候,由消费者去访问

  • 消费者不能超过生产者

  • 生产者不能把消费者套圈,因为这样会导致数据被覆盖

生产者最关心还剩多少空间(空间数量);消费者最关系还剩多少数据(数据数量)。因为有两种资源,所以需要定义两个信号量。

3.1 单生产单消费模型

// RingQueue.hpp
#pragma once

#include <pthread.h>
#include <vector>
#include <semaphore.h>

template<class T>
class RingQueue 
{
private:
    static const int defaultcap = 5;
    void P(sem_t *sem) // 申请一个信号量
    {
        sem_wait(sem); 
    }

    void V(sem_t *sem) // 归还一个信号量
    {
        sem_post(sem);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap_);
    }

    void Push(const T &data) // 生产行为
    {
        P(&pspace_sem);
        ringqueue_[p_step] = data;
        V(&cdata_sem);

        p_step++;
        p_step %= cap_;
    }

    void Pop(T *out) // 消费行为
    {
        P(&cdata_sem);
        *out = ringqueue_[c_step];
        V(&pspace_sem);
        c_step++;
        c_step %= cap_;
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);
    }
private:
    std::vector<T> ringqueue_; // 环形队列
    int cap_; // 容量
    int c_step; // 消费者下一个要消费的位置
    int p_step; // 生产者下一个要生产的位置

    sem_t cdata_sem; // 数据资源
    sem_t pspace_sem; // 空间资源
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>

using namespace std;

void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);

    while(true)
    {
        int data = 0;
        rq->Pop(&data);
        cout << "Consumer is running... get a data: " << data << endl;

        // 模拟处理数据
        usleep(1000000);
    }
}

void *Productor(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
    while(true)
    {
        // 获取数据
        usleep(10000); // 模拟获取数据
        int data = rand() % 10;
        rq->Push(data);
        cout << "Productor is running... produce a data: " << data << endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c, p;
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_create(&c, nullptr, Consumer, rq);
    pthread_create(&p, nullptr, Productor, rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

基于环形队列的单生产单消费模型

互斥与同步的体现:当生产下标和消费下标相同的时候,只允许一个来访问,这就是互斥性的体现。当队列为空的时候,让生产者去访问资源,当队列为满的时候,让消费者去访问资源,这就是在指向同一个位置时,让生产和消费具有一定的顺序性,这就是同步性的体现。当队列不为空或不为满的时候,生产下标和消费下标不同,此时两个线程并发执行,并没有体现出很强的互斥特性。

3.2 多生产多消费模型

此时需要对下标资源进行保护。因为生产下标和消费下标各自只有一份,不允许同时有多个生产线程去访问生产下标,消费线程也一样。因此需要通过加锁来实现生产线程之间的互斥和消费线程之间的互斥。

先加锁还是先申请信号量?答案是先申请信号量,以生产线程为例,这样可以让所有生产线程并发的去执行,什么意思呢?如果是先加锁再申请信号量的话,因为始终只有一个生产者线程能够申请到锁,所以也就只有一个生产者线程能去申请信号量,其他生产者线程只能干巴巴的等待锁被释放。这时申请锁和申请信号量的动作是串行的。而先申请信号量的话,可以保证虽然只有一个线程能够申请到锁,但是其他没有锁的线程也可以不用闲着,可以先去申请信号量,因为信号量的申请是原子的,因此也不需要加锁进行保护,只要能申请到信号量,就说明资源还有,此时那些申请到信号量的线程就可能等待锁被释放,拿到锁之后就可以去执行相应的代码了。

// RingQueue.hpp
#pragma once

#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>

template<class T>
class RingQueue 
{
private:
    static const int defaultcap = 5;
    void P(sem_t *sem) // 申请一个信号量
    {
        sem_wait(sem); 
    }

    void V(sem_t *sem) // 归还一个信号量
    {
        sem_post(sem);
    }

    void Lock(pthread_mutex_t *mutex)
    {
        pthread_mutex_lock(mutex);
    }

    void Unlock(pthread_mutex_t *mutex)
    {
        pthread_mutex_unlock(mutex);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap_);
        pthread_mutex_init(&c_mutex, nullptr);
        pthread_mutex_init(&p_mutex, nullptr);
    }

    void Push(const T &data) // 生产行为
    {
        P(&pspace_sem);
        Lock(&p_mutex);
        ringqueue_[p_step] = data;
        p_step++;
        p_step %= cap_;
        Unlock(&p_mutex);
        V(&cdata_sem);
    }

    void Pop(T *out) // 消费行为
    {
        P(&cdata_sem); // 信号量资源是不需要保护的,因为它的操作是原子的,临界区中的代码要尽可能的少,所以不需要把信号量的申请放在加锁之后
        Lock(&c_mutex);
        *out = ringqueue_[c_step];
        c_step++;
        c_step %= cap_;
        Unlock(&c_mutex);
        V(&pspace_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
private:
    std::vector<T> ringqueue_; // 环形队列
    int cap_; // 容量
    int c_step; // 消费者下一个要消费的位置
    int p_step; // 生产者下一个要生产的位置

    sem_t cdata_sem; // 数据资源
    sem_t pspace_sem; // 空间资源

    pthread_mutex_t c_mutex; // 对消费下标的保护
    pthread_mutex_t p_mutex; // 对生产下标的保护
};

template <class T>
class Message
{
public:
    Message(std::string thread_name, RingQueue<T> *ringqueue)
    :thread_name_(thread_name), ringqueue_(ringqueue)
    {}

    std::string &get_thread_name()
    {
        return thread_name_;
    }

    RingQueue<T> *get_ringqueue()
    {
        return ringqueue_;
    }
private:
    std::string thread_name_;
    RingQueue<T> *ringqueue_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>

using namespace std;

void *Consumer(void *args)
{
    Message<int> *message = static_cast<Message<int> *>(args);
    RingQueue<int> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    while (true)
    {
        int data = 0;
        rq->Pop(&data);
        printf("%s is running... get a data: %d\n", name.c_str(), data);

        // 模拟处理数据
        // usleep(1000000);
    }
}

void *Productor(void *args)
{
    Message<int> *message = static_cast<Message<int> *>(args);
    RingQueue<int> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    while (true)
    {
        // 获取数据
        // usleep(1000000); // 模拟获取数据
        int data = rand() % 10;
        rq->Push(data);
        printf("%s is running... produce a data: %d\n", name.c_str(), data);
        usleep(1000000);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c[3], p[5];
    RingQueue<int> *rq = new RingQueue<int>(); 
    vector<Message<int>*> messages; 

    for (int i = 0; i < 5; i++)
    {
        Message<int> *message = new Message<int>("Produttor Thread "+to_string(i), rq);
        pthread_create(p + i, nullptr, Productor, message);
        messages.push_back(message);
    }

    for (int i = 0; i < 3; i++)
    {
        Message<int> *message = new Message<int>("Consumer Thread "+to_string(i), rq);
        pthread_create(c + i, nullptr, Consumer, message);
        messages.push_back(message);
    }


    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }

    for (auto message : messages)
    {
        delete message;
    }

    delete rq;
    return 0;
}

基于环形队列的多生产多消费模型

3.3 基于任务的多生产多消费模型

RingQueue 的内容不变

// Task.h
#include <iostream>
#include <string>

enum
{
    DIVERROR = 1,
    MODERROR,
    UNKNOWERRROR
};

class Task
{
public:
    Task(int a = 0, int b = 0, char op = '+')
    :data1_(a), data2_(b), op_(op), result_(0), exitcode_(0)
    {}

    void run()
    {
        switch(op_)
        {
            case '+':
                result_ = data1_ + data2_;
                break;
            case '-':
                result_ = data1_ - data2_;
                break;
            case '*':
                result_ = data1_ * data2_;
                break;
            case '/':
                if(data2_ == 0) exitcode_ = DIVERROR;
                else result_ = data1_ / data2_;
                break;
            case '%':
                if(data2_ == 0) exitcode_ = MODERROR;
                else result_ = data1_ % data2_;
                break;
            default:
                exitcode_ = UNKNOWERRROR;
                break;
        }
    }

    std::string result_to_string()
    {
        std::string ret = std::to_string(data1_);
        ret += ' ';
        ret += op_;
        ret += ' ';
        ret += std::to_string(data2_);
        ret += ' ';
        ret += '=';
        ret += ' ';
        ret += std::to_string(result_);
        ret += "[exitcode: ";
        ret += std::to_string(exitcode_);
        ret += ']';

        return ret;
    }

    std::string get_task()
    {
        std::string ret = std::to_string(data1_);
        ret += ' ';
        ret += op_;
        ret += ' ';
        ret += std::to_string(data2_);
        ret += ' ';
        ret += '=';
        ret += ' ';
        ret += '?';
        return ret;
    }    
private:
    int data1_;
    int data2_;
    char op_;
    int result_;
    int exitcode_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Task.h"

using namespace std;

const std::string opers = "+-*/%";

void *Consumer(void *args)
{
    Message<Task> *message = static_cast<Message<Task> *>(args);
    RingQueue<Task> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    while (true)
    {
        // 获取任务
        // int data = 0;
        Task task;
        rq->Pop(&task);

        // 对任务做处理
        task.run();
        printf("%s is running... get a data: %s\n", name.c_str(), task.result_to_string().c_str());

        // 模拟处理数据
        // usleep(1000000);
    }
}

void *Productor(void *args)
{
    Message<Task> *message = static_cast<Message<Task> *>(args);
    RingQueue<Task> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    int len = opers.size();
    while (true)
    {
        // 获取数据
        // usleep(1000000); // 模拟获取数据
        // int data = rand() % 10;

        // 模拟获取数据
        int data1 = rand() % 10 + 1; // [1, 10]
        usleep(10);

        int data2 = rand() % 13; // [0, 13]
        usleep(10);

        char op = opers[rand() % len];

        Task task(data1, data2, op);
        
        // 生产数据
        rq->Push(task);
        // printf("%s is running... produce a data: %d\n", name.c_str(), data);
        printf("%s is running... produce a Task: %s\n", name.c_str(), task.get_task().c_str());
        usleep(1000000);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c[3], p[2];
    RingQueue<Task> *rq = new RingQueue<Task>(); 
    vector<Message<Task>*> messages; 

    for (int i = 0; i < 5; i++)
    {
        Message<Task> *message = new Message<Task>("Produttor Thread "+to_string(i), rq);
        pthread_create(p + i, nullptr, Productor, message);
        messages.push_back(message);
    }

    for (int i = 0; i < 3; i++)
    {
        Message<Task> *message = new Message<Task>("Consumer Thread "+to_string(i), rq);
        pthread_create(c + i, nullptr, Consumer, message);
        messages.push_back(message);
    }

    // 等待子线程
    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }

    // 释放资源
    for (auto message : messages)
    {
        delete message;
    }

    delete rq;
    return 0; 
}

基于环形队列的多生产多消费模型(基于任务的)

四、结语

今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1689852.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

构建健壮的机器学习大数据平台:任务实现与数据治理的关键

随着数据驱动决策成为现代企业的核心&#xff0c;构建安全、可靠且可扩展的大数据平台变得至关重要。这样的平台不仅需要支持复杂的机器学习任务&#xff0c;还需要在数据质量、合规性和分发方面提供严格的控制。本文旨在探讨构建大型企业机器学习大数据平台时需要考虑的关键要…

项目如何有效做资源管理?易趋项目管理软件让资源管理可视化

在项目管理的过程中&#xff0c;有效的资源管理能够确保资源得到合理的分配和使用&#xff0c;避免资源的浪费和冗余&#xff0c;进而提高整体工作效率、确保项目的成功&#xff1b;同时降低组织的运营成本。 但在项目推进过程中&#xff0c;项目经理总会面临各种资源管理的难…

基于Tensorflow卷积神经网络人脸识别公寓人员进出管理系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景与意义 随着科技的快速发展和智能化水平的提高&#xff0c;公寓管理面临着越来越多的挑战。传统的公寓…

HTML静态网页成品作业(HTML+CSS)——我的家乡云南保山介绍网页(3个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有3个页面。 二、作品演示 三、代…

轻松同步:将照片从三星手机传输到iPad的简便方法

概括 想要在新 iPad 上查看三星照片吗&#xff1f;但是&#xff0c;如果您不知道如何将照片从三星手机传输到 iPad&#xff0c;则无法在 iPad 上查看图片。为此&#xff0c;本文分享了 7 个有用的方法&#xff0c;以便您可以使用它们在不同操作系统之间轻松发送照片。现在&…

leetcode-盛水最多的容器-109

题目要求 思路 1.正常用双循环外循环i从0开始&#xff0c;内循环从height.size()-1开始去计算每一个值是可以的&#xff0c;但是因为数据量太大&#xff0c;会超时。 2.考虑到超时&#xff0c;需要优化一些&#xff0c;比如第一个选下标1&#xff0c;第二个选下标3和第一个选下…

【笔记】从零开始做一个精灵龙女-素模阶段

事前准备 1.在ps标记好位置先&#xff0c;斜方肌&#xff0c;腰线&#xff0c;耻骨&#xff0c;膝盖&#xff0c;脚 2.导入素模&#xff0c;对好位置 软选择 1.原画上半身很短&#xff0c;所以这里把上半身做的也短一些 选择上半身的点-软选择-衰减调整-箭头调整 如果要调整…

mysql数据库innodb体系结构(一、内存结构 与二、物理存储结构)

文章目录 InnoDB存储引擎结构图innoDB体系结构一、内存结构1.Buffer Pool2.Change Pool3.Log Buffer 二、物理存储结构1.系统表空间2.独立表空间3.Redo日志1、redo 日志 4.Undo日志1、undo 日志 回滚段中的UNDO日志分为两种&#xff1a;UNDO 日志存储结构 InnoDB存储引擎结构图…

Flat Ads获广东电视台报道!CEO林啸:助力更多企业实现业务全球化增长

近日,在广州举行的第四届全球产品与增长展会(PAGC2024)上,Flat Ads凭借其卓越的一站式全球化营销和创新的变现方案大放异彩,不仅吸引了众多业界目光,同时也在展会上斩获了备受瞩目的“金帆奖”,展现了其在全球化营销推广领域的卓越实力和专业服务。 在大会现场,Flat Ads的CEO林…

差分约束题解

目录 注意点&#xff1a; 思路&#xff1a; SPFA和Dij的不同点&#xff1a; Dij: SPFA: AC代码&#xff1a; 扩展&#xff1a; 题目链接&#xff1a;【模板】差分约束 - 洛谷 注意点&#xff1a; 注意这一题不能用Dij&#xff0c;只能用SPFA 因为这样子才可以得出这个不…

【简单介绍下近邻算法】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

全域运营是割韭菜吗?常见套路有哪些?

随着全域运营赛道的全面开启&#xff0c;全域运营服务商和全域运营系统的数量迅速增加&#xff0c;持续激发赛道活力的同时&#xff0c;也让一些试图用全域运营割韭菜的人有了可趁之机。 值得庆幸的是&#xff0c;由于当前全域运营赛道刚兴起不久&#xff0c;因此&#xff0c;割…

Raylib 绘制自定义字体的一种套路

Raylib 绘制自定义字体是真的难搞。我的需求是程序可以加载多种自定义字体&#xff0c;英文中文的都有。 我调试了很久成功了&#xff01; 很有用的参考&#xff0c;建议先看一遍&#xff1a; 瞿华&#xff1a;raylib绘制中文内容 个人笔记&#xff5c;Raylib 的字体使用 - …

Nginx - 健康检查终极指南:探索Upstream Check模块

文章目录 概述upstream_check_module模块安装和配置指南模块安装步骤基本配置示例详细配置说明检查类型和参数常见问题及解决方案 SSL检查和DNS解析功能SSL检查配置示例和说明配置示例 DNS解析配置示例和说明配置示例 结合实际应用场景的高级配置示例综合SSL检查与DNS解析 总结…

代码随想录算法训练营第三天| 203.移除链表元素、 707.设计链表、 206.反转链表

203.移除链表元素 题目链接&#xff1a; 203.移除链表元素 文档讲解&#xff1a;代码随想录 状态&#xff1a;没做出来&#xff0c;做题的时候定义了一个cur指针跳过了目标val遍历了一遍链表&#xff0c;实际上并没有删除该删的节点。 错误代码&#xff1a; public ListNode re…

一键恢复安卓手机数据:3个快速简便的解决方案!

安卓手机作为我们不可或缺的数字伙伴&#xff0c;承载着大量珍贵的个人和工作数据。然而&#xff0c;随着我们在手机上进行各种操作&#xff0c;不可避免地会遇到一些令人头痛的问题&#xff0c;比如意外删除文件、系统故障或其他不可预见的情况&#xff0c;导致重要数据的丢失…

springboot基于Web前端技术的java养老院管理系统_utbl7

3.普通用户模块包括&#xff1a;普通会员的注册、养老院客房查询、养老院留言查询、预约老人基本信息登记、选择房间、用户缴费的功能。 4.数据信息能够及时进行动态更新&#xff0c;增删&#xff0c;用户搜素方便&#xff0c;使用户可以直接浏览相关信息&#xff0c;要考虑便于…

埋点——about前端

所谓“埋点”&#xff0c;是数据采集领域(尤其是用户行为数据采集领域)的术语&#xff0c;指的是针对特定用户行为或事件进行捕获、处理和发送的相关技术及其实施过程。比如用户某个icon点击次数、观看某个视频的时长等等,埋点的技术实质&#xff0c;是先监听软件应用运行过程中…

C#数据类型变量、常量

一个变量只不过是一个供程序操作的存储区的名字。 在 C# 中&#xff0c;变量是用于存储和表示数据的标识符&#xff0c;在声明变量时&#xff0c;您需要指定变量的类型&#xff0c;并且可以选择性地为其分配一个初始值。 在 C# 中&#xff0c;每个变量都有一个特定的类型&…

只需提交几次代码就能轻松实现系统级的变更!——“基础设施即代码”模式与实践

“基础设施即代码”模式与实践 基础设施即代码&#xff08;Infrastructure as Code&#xff0c;IaC&#xff09;是指利用脚本、配置或编程语言创建和维护基础设施的一组实践和流程。通过IaC&#xff0c;我们可以轻松测试各个组件、实现所需的功能并在最小化停机时间的前提下进行…