【Linux系统编程二十九】基于信号量的环形队列生产消费模型

news2025/1/13 13:07:18

【Linux系统编程二十九】基于信号量的环形队列生产消费模型

  • 一.信号量
    • 1.P操作
    • 2.V操作
  • 二.环形队列
  • 三.单生产单消费场景
    • 1.信号量维持生产消费之间互斥同步
  • 四.多生产多消费场景
    • 1.加锁维持生产生产,消费消费互斥
  • 五.总结

一.信号量

当共享资源被当成整体使用时,则共享资源的数量要么是1,要么是0。
当被访问使用时,共享资源的数量就为0,当没有被使用时,数量就为1。

共享资源是可以被分成多份来使用的,只不过不同的线程访问的是该共享资源的不同的区域,它是允许多个线程并发访问的,只不过访问的是不同的区域。而信号量就是用来表示这个大的共享资源被分成多少份小的资源的个数。
信号量本质就是一把计数器,用来描述资源数量的多少。
信号量表示对资源的预定机制,只要你信号量申请成功了,就表明你肯定可以使用这块资源。不需要再判断资源是否就绪,因为在申请信号量的时候就已经判断好了,申请成功就可以使用,申请失败就不能使用就去挂起等待。
在这里插入图片描述

互斥锁的本质也就是信号量,只不过是二元信号量,结果要么是1要么是0。申请到锁了,就可以去访问共享资源,访问是能访问,单是使用是有使用条件的,如果使用条件不满足,也无法使用。
在这里插入图片描述

sem_t 就是信号量类型,使用它需要初始化:
在这里插入图片描述

1.P操作

P操作就是申请信号量,也就是先预定关心的资源。因为信号量本质就是一把计数器,用来描述临界资源数量的多少,而申请一个信号量,就是预定临界资源,对应的临界资源总数就要减一。
申请成功就可以使用该资源,申请失败,就说明没有资源给你使用,那么就等待。
在这里插入图片描述

2.V操作

V操作就是释放信号量,也就是释放对应的资源,对应的临界资源总数就要加一。在这里插入图片描述

二.环形队列

利用环形队列实现的生产消费模型需要满足几个条件:
首先生产者往队列里投入数据时,生产者就往后走。
消费者从队列里拿走数据时,消费者就往后走。
当队列为空,为满的时候,生产和消费是在同一个位置上的,而不为空,不为满的时候,生产和消费一定指向不同的位置。

1.当队列为空或者为满时,生产者和消费者都在同一个位置,而为了满足321原则,生产者和消费者之间必须互斥,所以只允许一个人访问队列。
并且为空的时候,肯定是生产者先访问,而消费后访问。当为满时,肯定是消费者先访问,生产者后访问。
2.而当队列不为空,不为满时,生产和消费一定指向的是不同的位置,这时生产和消费可以同时访问队列。

原则1:当实指向同一个位置时,只能有一个人访问
原则2:消费者是不能超过生产者的,生产者生产多少数据,消费者最多只能消费多少数据
原则3:生产者不能将消费者套一个圈,不然原先的数据就会被覆盖。

在这里插入图片描述
在模型中我们利用vector数组来模拟环形队列。

三.单生产单消费场景

在这里插入图片描述

生产者关注的资源是什么呢?消费者关心的资源是什么呢?

生产者关注的是队列里是否还有空间,关注的是空间资源。
消费者关注的是队列里是否还有数据,关注的是数据资源。
而一开始,队列里并没有数据资源,但空间资源是有的,就是队列的大小。

因为信号量是描述临界资源的数量,所以我们就可以定义两个信号量分别表示对应的,空间资源,数据资源的多少。

在这里插入图片描述

【1】生产者要想往队列里生产,需要先申请空间资源,也就是空间信号量。申请成功后,就可以进行生产,生产完毕后,当前的空间被生产者生产的数据着,并且队列数据多了一个,所以需要释放数据资源,也就是释放数据信号量。如果申请信号量失败,那么就等待挂起。
在这里插入图片描述

【2】消费者想要从队列里消费,需要先申请数据资源,也就是数据信号量。申请成功后,就可以进行消费,消费完毕后,数据被消费者拿走,而当前的空间多了一个,所以需要释放空间资源,也就是释放空间信号量。如果申请失败,那么就等待挂起。
在这里插入图片描述

1.信号量维持生产消费之间互斥同步

在环形队列生产模型中,我们要探讨的三种关系,其中生产者和消费者之间的互斥和同步关系是由信号量来维持的。

在这里插入图片描述

#include <pthread.h>
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "TASK.hpp"
struct ThreadData
{
    RingQueue<TASK>* RQ;
    std::string name;
};
void *Consumer(void *args)
{
   
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    while (true)
    {
        //sleep(2);
        // 1.消费数据
        TASK t;
        rq->Pop(&t);

        // 2.处理数据
        t.run();
        std::cout << "Comsumer get task,task is: " << t.GetTASK() <<"who: "<<td->name<< " reslut :" << t.Getresult() << std::endl;
    }
    return nullptr;
}

void *Producer(void *args)
{
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    int len = opera.size();
    while (true)
    {
       
        // 1.获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opera[rand() % len];
        // 2.生产数据
        TASK t(data1, data2, op);
        rq->Push(t);
        std::cout << "Productor task done,task is: " << t.GetTASK() <<" who: "<<td->name<< std::endl;
         sleep(2);
    }
    return nullptr;
}


int main()
{

       srand(time(nullptr));
        RingQueue<TASK> *rq = new RingQueue<TASK>();
        pthread_t c, p;
        ThreadData* td=new ThreadData();
        td->name="Procductor-"+std::to_string(i);
        td->RQ=rq;
        pthread_create(p + i, nullptr, Producer, td);
  //在创建线程的时候可以给线程定义基本的属性比如名字,以参数的形式传给线程
        pthread_create(c + i, nullptr, Consumer, td);
   
        pthread_join(c[i], nullptr);
        pthread_join(p[i], nullptr);
    
}

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
// 先模拟单生产单消费
const static int defaultcap = 5; // 默认大小
template <class T>
class RingQueue
{

public:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    RingQueue(int cap = defaultcap) : _ringqueue(cap), _cap(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
//注意的是,对下标加锁,应该在申请信号量后面,从两个方面解释:信号量不需要保护,本身就是原子的。
//先申请信号量,后加锁可以提高效率,一个并发度,别人正在执行时,其他线程还可以申请信号量
    void Push(const T &in)
    {
        // 生产者在生产数据之前,需要先申请对应的信号量,申请成功说明可以生产数据
        P(pspace_sem); // 生产者申请空间资源
     
        _ringqueue[p_step] = in;
        p_step++;
        p_step %= _cap; // 维持环形
       
        V(cdata_sem); // 格子已经被占了,数据多了一个,所以释放数据资源
    }

    void Pop(T *out)
    {
        // 消费者在消费数据之前,也需要申请对应的信号量,申请成功,说明可以消费

        P(cdata_sem); // 消费者申请数据资源
        *out = _ringqueue[c_step];
        c_step++;
        c_step %= _cap; // 维持环形
        V(pspace_sem); // 拿走一个数据,空间资源多出来一个
    }

private:                       // 环形队列的基本属性
    std::vector<T> _ringqueue; // 用vector来模拟环形队列
    int _cap;                  // 该环形队列的最大值

    int c_step; // 消费者下标
    int p_step; // 生产者下标

    // 消费者和生产者各自关心的资源

    sem_t cdata_sem;  // 消费者关心的数据资源
    sem_t pspace_sem; // 生产者关心的空间资源
};
#pragma once
#include <iostream>
#include <string>

std::string opera="+-*/%";
class TASK
{

public:
    TASK()
    {}
    TASK(int data1, int data2, char op) : _data1(data1), _data2(data2), _oper(op)
    {
    }
    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
        {
            if (_data2 == 0)
                _exitcode = 1;
            else
                _result = _data1 / _data2;
        }
        break;
        case '%':
        {
            if (_data2 == 0)
                _exitcode = 2;
            else
                _result = _data1 % _data2;
        }
        break;

        default:
        _exitcode=3;
            break;
        }
    }

    std::string GetTASK()
    {
       std::string r=std::to_string(_data1);
       r+=_oper;
       r+=std::to_string(_data2);
       r+="=?";
       return r;
    }
    std::string Getresult()
    {
        std::string result=std::to_string(_data1);
        result+=_oper;
        result+=std::to_string(_data2);
        result+='=';
        result+=std::to_string(_result);
        result+="[code:";
        result+=std::to_string(_exitcode);
        result+=']';
        return result;
    }

private:
    int _data1;
    int _data2;
    char _oper;

    int _result;
    int _exitcode;
};

四.多生产多消费场景

在单生产单消费模型中,生产者只有一个,而当多生产时,就可能出现问题。
比如生产过程中Push,同时进入多个生产者,集中申请信号量,因为下标只有一个,那么线程就会对同一个下标进行竞争,A生产者要往队列该下标里写,B生产者也要往队列该下标里写,最后就会造成数据的覆盖丢失。
在这里插入图片描述

在这里插入图片描述

在环形队列里,不能存在多个生产者和多个消费者同时在环形队列的情况。这就要求生产者和生产者之间要进行互斥,消费者和消费者之间要进行互斥。
所以需要锁来实现它们之间的互斥,因为在大部分情况下,生产者和消费者不是在同一个位置上,所以可以并发执行,这时如果只设置一把锁,就会降低并发度,因为这两种关系的线程是关系不同的资源,所以使用两把锁,来加锁不同的资源。
在这里插入图片描述

1.加锁维持生产生产,消费消费互斥

所以加锁是为了保护环形队列下标不被多线程并发访问。
不过这里就存在一个问题:加锁是在申请信号量之前加锁呢?还是在申请信号量之后加锁呢?

答案:在申请信号量之后加锁。为什么?
原因1:因为申请信号量本身就是原子的,不需要保护。
原因2:因为在申请信号之后加锁,可以提高效率,因为当线程持有锁执行期间,其他线程还可以进行继续申请信号量(申请成功后,就在等待锁资源,申请失败,挂起等待),等持有锁线程释放锁后,就可以直接去竞争锁。也就是多线程可以集中在申请信号量这里,先将资源预定好了,等有锁资源时,就可以直接访问临界资源。
所以在生产者生产的过程中,其他生产者还可以去申请信号量,提高并发度。
在这里插入图片描述

#include <pthread.h>
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "TASK.hpp"
struct ThreadData
{
    RingQueue<TASK>* RQ;
    std::string name;
};
void *Consumer(void *args)
{
   
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    while (true)
    {
        //sleep(2);
        // 1.消费数据
        TASK t;
        rq->Pop(&t);

        // 2.处理数据
        t.run();
        std::cout << "Comsumer get task,task is: " << t.GetTASK() <<"who: "<<td->name<< " reslut :" << t.Getresult() << std::endl;
    }
    return nullptr;
}

void *Producer(void *args)
{
    ThreadData *td=static_cast<ThreadData*>(args);
    RingQueue<TASK> *rq=td->RQ;

    int len = opera.size();
    while (true)
    {
       
        // 1.获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opera[rand() % len];
        // 2.生产数据
        TASK t(data1, data2, op);
        rq->Push(t);
        std::cout << "Productor task done,task is: " << t.GetTASK() <<" who: "<<td->name<< std::endl;
         sleep(2);
    }
    return nullptr;
}


int main()
{

    srand(time(nullptr));
    RingQueue<TASK> *rq = new RingQueue<TASK>();
    pthread_t c[5], p[3];
    for (int i = 0; i < 3; i++)
    {
        ThreadData* td=new ThreadData();
        td->name="Procductor-"+std::to_string(i);
        td->RQ=rq;
        pthread_create(p + i, nullptr, Producer, td);
    }
    for (int i = 0; i < 5; i++)//在创建线程的时候可以给线程定义基本的属性比如名字,以参数的形式传给线程
    {
         
        ThreadData* td=new ThreadData();
        td->name="Consumer-"+std::to_string(i);
        td->RQ=rq;
        pthread_create(c + i, nullptr, Consumer, td);
    }
    

    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
}

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
// 先模拟单生产单消费
const static int defaultcap = 5; // 默认大小
template <class T>
class RingQueue
{

public:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
      pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
      pthread_mutex_unlock(&mutex);
    }
    RingQueue(int cap = defaultcap) : _ringqueue(cap), _cap(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap);

        pthread_mutex_init(&c_mutex, nullptr);
        pthread_mutex_init(&p_mutex, nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
//注意的是,对下标加锁,应该在申请信号量后面,从两个方面解释:信号量不需要保护,本身就是原子的。
//先申请信号量,后加锁可以提高效率,一个并发度,别人正在执行时,其他线程还可以申请信号量
    void Push(const T &in)
    {
        // 生产者在生产数据之前,需要先申请对应的信号量,申请成功说明可以生产数据
        P(pspace_sem); // 生产者申请空间资源
        
        Lock(p_mutex);
        _ringqueue[p_step] = in;
        p_step++;
        p_step %= _cap; // 维持环形
        Unlock(p_mutex);
       
        V(cdata_sem); // 格子已经被占了,数据多了一个,所以释放数据资源
    }

    void Pop(T *out)
    {
        // 消费者在消费数据之前,也需要申请对应的信号量,申请成功,说明可以消费

        P(cdata_sem); // 消费者申请数据资源
        Lock(c_mutex);
        *out = _ringqueue[c_step];
        c_step++;
        c_step %= _cap; // 维持环形
        Unlock(c_mutex);

        V(pspace_sem); // 拿走一个数据,空间资源多出来一个
    }

private:                       // 环形队列的基本属性
    std::vector<T> _ringqueue; // 用vector来模拟环形队列
    int _cap;                  // 该环形队列的最大值

    int c_step; // 消费者下标
    int p_step; // 生产者下标

    // 消费者和生产者各自关心的资源

    sem_t cdata_sem;  // 消费者关心的数据资源
    sem_t pspace_sem; // 生产者关心的空间资源

    // 如果是多个生产者,多个消费者,那么就需要用到锁
    // 因为生产者与生产者直接会竞争下标,而下标只有一个,所以需要对下标进行加锁保护,实现生产者与生产者之间的互斥。
    // 而消费者之间也会竞争下标,生产消费因为关心的是不同资源,所以各自保护各自的下标。
    pthread_mutex_t c_mutex;
    pthread_mutex_t p_mutex;
};

五.总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1401364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java-NIO篇章(4)——Reactor反应器模式

前面已经讲过了Java-NIO中的三大核心组件Selector、Channel、Buffer&#xff0c;现在组件我们回了&#xff0c;但是如何实现一个超级高并发的socket网络通信程序呢&#xff1f;假设&#xff0c;我们只有一台内存为32G的Intel-i710八核的机器&#xff0c;如何实现同时2万个客户端…

PID笔记

Improving the Beginner’s PID 参考资料 Improving the Beginner’s PID – Introduction The Beginner’s PID 以下是每个人第一次学习的PID方程&#xff1a; 这导致几乎每个人都编写了以下PID控制器&#xff1a; /*working variables*/ unsigned long lastTime; double…

Mongo集群入门

一、前言 MongoDB 有三种集群架构模式&#xff0c;分别为主从复制&#xff08;Master-Slaver&#xff09;、副本集&#xff08;Replica Set&#xff09;和分片&#xff08;Sharding&#xff09;模式。 Master-Slaver 是一种主从复制的模式&#xff0c;目前已经不推荐使用。 Re…

vue.js js 雪花算法ID生成 vue.js之snowFlake算法

随着前端业务越来越复杂&#xff0c;自定义表单数据量比较大&#xff0c;每条数据的id生成则至关重要。想到前期IOS中实现的雪花算法ID&#xff0c;照着其实现JS版本&#xff0c;供大家学习参考。 一、库的建立引入 在你项目中创建一个snowFlake.js的文件&#xff1a;拷贝以下…

《Windows核心编程》若干知识点应用实战分享

目录 1、进程的虚拟内存分区与小于0x10000的小地址内存区 1.1、进程的虚拟内存分区 1.2、小于0x10000的小地址内存区 2、保存线程上下文的CONTEXT结构体 3、从汇编代码角度去理解多线程运行过程的典型实例 4、调用TerminateThread强制结束线程会导致线程中的资源没有释放…

大数据关联规则挖掘:Apriori算法的深度探讨

文章目录 大数据关联规则挖掘&#xff1a;Apriori算法的深度探讨一、简介什么是关联规则挖掘&#xff1f;什么是频繁项集&#xff1f;什么是支持度与置信度&#xff1f;Apriori算法的重要性应用场景 二、理论基础项和项集支持度&#xff08;Support&#xff09;置信度&#xff…

vue3相比vue2的效率提升

1、静态提升 2、预字符串化 3、缓存事件处理函数 4、Block Tree 5、PatchFlag 一、静态提升 在vue3中的app.vue文件如下&#xff1a; 在服务器中&#xff0c;template中的内容会变异成render渲染函数。 最终编译后的文件&#xff1a; 1.静态节点优化 那么这里为什么是两部分…

[SwiftUI]自定义滚动菜单栏进行PageView页面切换

如图&#xff0c;自定义一个菜单栏&#xff0c;要求点击菜单按钮和滚动翻页步调统一。 首先有个分类模型 import Foundationstruct CategoryModel: Hashable {var categoryID: Int 0var categoryName: String ""} 基础实现代码如下&#xff0c;点击菜单和滚动页面…

C++(14)——string的模拟实现

前几篇文章中介绍了关于以及其相关函数的使用&#xff0c;为了更清楚的了解这些函数的作用&#xff0c;本篇文章通过模拟实现的方式来加深对于函数作用原理的理解。 目录 1. String的整体框架&#xff1a; 1.1 成员变量&#xff1a; 1.2 构造函数&#xff1a; 1.3 析构函数…

2023年总结我所经历的技术大变革

&#x1f4e2;欢迎点赞 &#xff1a;&#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff0c;赐人玫瑰&#xff0c;手留余香&#xff01;&#x1f4e2;本文作者&#xff1a;由webmote 原创&#x1f4e2;作者格言&#xff1a;新的征程&#xff0c;我们面对的不仅…

快速玩转 Mixtral 8x7B MOE大模型!阿里云机器学习 PAI 推出最佳实践

作者&#xff1a;熊兮、贺弘、临在 Mixtral 8x7B大模型是Mixtral AI推出的基于decoder-only架构的稀疏专家混合网络&#xff08;Mixture-Of-Experts&#xff0c;MOE&#xff09;开源大语言模型。这一模型具有46.7B的总参数量&#xff0c;对于每个token&#xff0c;路由器网络选…

Acwing 138 周赛 解题报告 | 珂学家 | 偏序 + DP构造

前言 整体评价 很久没做acwing周赛了, 之前vp过一些周赛&#xff0c;感觉风格变了。 这次感觉还可以&#xff0c;都是些眼熟的套路题。 A. 5458. 进水排水问题 思路: 签到题 按题意描述编写 import java.io.*; import java.util.*;public class Main {public static void …

解决 conda新建虚拟环境只有一个conda-meta文件&conda新建虚拟环境不干净

像以前一样通过conda 新建虚拟环境时发现环境一团糟&#xff0c;首先新建虚拟环境 conda create -n newenv这时候activate newenv&#xff0c;通过pip list&#xff0c;会发现有很多很多的包&#xff0c;都是我在其他环境用到的。但诡异的是&#xff0c;来到anaconda下env的目…

openEuler安装KVM

1、关闭防火墙和selinux [rootlocalhost ~]# systemctl stop firewalld[rootlocalhost ~]# setenforce 0 2、下载软件包 libvirt&#xff1a;用于管理虚拟化平台的开源的 API&#xff0c;后台程序和管理工具。 qemu&#xff1a;开源&#xff08;模拟&#xff09;软件&#…

【51单片机】IO 扩展(串转并)--74HC595

0、前言 参考&#xff1a; 普中 51 单片机开发攻略 第12章 【51单片机入门教程-2020版 程序全程纯手打 从零开始入门】 https://www.bilibili.com/video/BV1Mb411e7re/?p21&share_sourcecopy_web&vd_source77e36f24add8dc77c362748ffb980148 nop()是什么语句&#…

算法常用思路总结

思路 1. 求数组中最大最小值思路代码 2. 计算阶乘思路&#xff1a;代码&#xff1a; 3. 得到数字的每一位思路代码 4. 计算时间类型5. 最大公约数、最小公倍数6. 循环数组的思想题目&#xff1a;猴子选大王代码 补充经典例题1. 复试四则运算题目内容题解 2. 数列求和题目内容题…

8.1 Java与数据库连接_XML(❤)

8.1 Java与数据库连接_XML 1. XML介绍与用途2. XML语法规则3. XML语义约束3.1 DTD语法3.2 创建DTD文件3.3 XML Schema语法1. XML介绍与用途 2. XML语法规则

常见的代码生成器使用

常见的代码生成器使用 目录概述需求&#xff1a; 设计思路实现思路分析1.第一部分2.第二部分 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,wait for cha…

Django随笔

关于Django的admin 1. 在url中把 from django.contrib import admin 重新解开 把path(admin/,admin.site.urls), 解开 2. 注册app&#xff0c;在配置文件中写 django.contrib.admin, 3.输入命令进行数据库迁移 Django国际化 配置文件中&#xff08;改成中文&#xff09; LA…

【STM32F103】DMA直接存储器访问游戏摇杆模块(ADCDMAEXTI)

前言&#xff08;可忽略&#xff09; 当初下定决心要走嵌入式的时候买了一堆传感器&#xff0c;但是因为懒和忙所以闲置了一堆&#xff0c;今天考完了最后一门&#xff0c;所以打算一个个都玩一遍&#xff0c;今天先从这个摇杆开始&#xff0c;当初买这个是想着以后做个遥控小…