【Linux】POSIX信号量与基于环形队列的生产消费者模型

news2025/3/30 14:37:45

目录

一、POSIX信号量:

接口:

二、基于环形队列的生产消费者模型

环形队列:

单生产单消费实现代码:

RingQueue.hpp:

main.cc:

多生产多消费实现代码:

RingQueue.hpp:

main.cc:


一、POSIX信号量:

在实现线程的同步,互斥不仅仅只有条件变量和锁,还有POSIX信号量,这里学习的POSIX信号量和之前学习的SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX可以用于线程间同步

引入信号量:

对于共享资源,为了保证其并发性,将其分成了几份资源,就允许几个线程进入共享资源访问,此时就引入了信号量来对其进行保护

信号量的本质是一个计数器

这把计数器用来描述临界资源中资源数目的多少,实际上是对资源的预加载机制(这就像在电影院买票,买票的本质就是对电影院座位的预加载机制,当你买到票了,就一定会有位置给你,并且别人即使有票也是坐别人的座位,也不会抢了你的座位)

虽然信号量的本质是一个计数器,当一个线程申请资源成功就将计数器--,当一个线程申请资源失败就将计数器++,但是不能用一个简简单单的普通变量代替信号量,因为变量的--和++操作不是原子的,所以,我们就要使用一个支持PV操作的原子的计数器------信号量

那么什么是PV操作呢?

P:代表申请资源,计数器--

V:代表释放资源,计数器++

当将共享资源分为N份,此时信号量也就是N,这个时候就能够申请资源,再将信号量--,当信号量为0的时候,线程就不能够申请资源了,只能阻塞等待

如上,这是一个多元信号量sem,我们之前学习的锁被叫做二元信号量

在使用多元信号量访问资源的时候,要先申请信号量,只有申请成功了,才能访问资源否则就需要进入阻塞队列等待

接口:

初始化信号量

参数1:需要初始化信号量的地址

参数2:表示的是线程共享还是进程共享,默认为零,线程共享,非零表示进程共享

参数3:设定的信号量的初始值

返回值:初始化成功返回0,失败返回-1,并设置错误码

其中sem_t实际上是一个联合体

销毁信号量

参数:就是需要销毁信号量的地址

返回值:初始化成功返回0,失败返回-1,并设置错误码

申请信号量:

其中下面用的是sem_wait,其功能就是成功将信号量-1,也就是P操作

参数:就是需要销毁信号量的地址

返回值:初始化成功返回0,失败返回-1,并设置错误码

sem_trywait:尝试申请,如果没有申请到资源,就会放弃申请

sem_timedwait:每隔一段时间进行申请

释放信号量(发布信号量)

参数:就是需要销毁信号量的地址

返回值:初始化成功返回0,失败返回-1,并设置错误码

其表示资源使用完毕,归还资源,成功将信号量+1,也就是V操作

二、基于环形队列的生产消费者模型

环形队列:

在实现生产消费者的模型中,不仅仅只有共享队列,还有环形队列,什么是环形队列呢?

虽然它叫环形队列,但是它不是队列,而是用数组实现的,
其中head作为头指针,当申请资源成功的时候就向后移动一位,
tail作为尾指针,当释放资源成功的时候向后移动一位,

首先,如何让数组成环呢?

在每次head++后都进行一次取模操作,这样保证head的大小不会超过这个环形队列的大小

特殊的是,当为空或者为满的时候,头指针和尾指针都指向同一个位置,那么如何证明此时是空还是满呢?

这里有两种方法:

方法一:添加一个计数器,当计数器的值为0的时候,表示当前为空,当计数器的值为容器大小的时候,表示该环形队列为满

判空条件:count == 0
判满条件:count == size

方法二:牺牲一个空间的大小,通过预留一个空位,避免head和tail重合时无法区分空和满。此时队列最大容量为size-1
判空条件:head== tail
判满条件:(head+ 1) % size == tail

在下面实现的时候采用计数器,毕竟信号量是一个天然的计数器

当数据不为空或者满的时候,此时head指针和tail指针必定不指向同一个位置,

此时就能够进行生产者和消费者的同时访问,

为空的时候,只能生产者访问,生产者只关注还剩多少空间

为满的时候,只能消费者访问,消费者只关注还剩多少数据

所以在使用信号量标识资源的情况下,生产者和消费者关注的资源不一样,所以就需要两个信号量来进行计数:

生产者的信号量:表示当前有多少可用空间

消费者的信号量:表示当前有多少可消费数据

所以以下在实现的时候,定义两个信号量,spacesem = N 和datasem = 0

对于生产者的PV操作:P(spacesem)将空间资源-1,V(datasem)将数据资源+1

对于消费者的PV操作:P(datasem)将数据资源-1,V(spacesem)将空间资源+1

单生产单消费实现代码:

RingQueue.hpp:

首先创建一个实现环形队列的文件:

#pragma once
#include <vector>
#include <iostream>
#include <semaphore.h>

template <class T>
class RingQueue
{
private:
    std::vector<T> _ringqueue; // 用vector模拟环形队列
    int _maxcap;               // 环形队列的最大容量

    int _p_step; // 生产者下标
    int _c_step; // 消费者下标

    sem_t _pspace_sem; // 生产者关注的空间资源
    sem_t _cdata_sem;  // 消费者关注的数据资源
};

接着依次实现其中的接口:

构造与析构

    RingQueue(int maxcap = 5)
        : _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0)
    {
        sem_init(&_pspace_sem,0,maxcap);
        sem_init(&_cdata_sem,0,0);

        pthread_mutex_init(&_p_mutex,nullptr);
        pthread_mutex_init(&_c_mutex,nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&_pspace_sem);
        sem_destroy(&_cdata_sem);

        pthread_mutex_destroy(&_p_mutex);
        pthread_mutex_destroy(&_c_mutex);
    }

其中,构造函数的主要作用就是初始化各种变量,析构函数的主要作用就是释放这些变量

push与pop

push的作用是从交易场所中放入数据,pop的作用是从交易场所中拿到数据

    void push(const T& in)
    {
        //生产数据先要申请信号量来预定资源
        P(_pspace_sem);

        _ringqueue[_p_step] = in;//将所对应的数据放入到环形队列中
        _p_step++;//将生产者对应的下标向后移动一位
        _p_step %= _maxcap;//保证生产者不会超过环形队列的大小

        V(_cdata_sem);
    }

    void pop(T *out)
    {
        P(_cdata_sem);

        pthread_mutex_lock(&_c_mutex);
        *out = _ringqueue[_c_step];//将该位置的数据交给out作为输出型参数带出去
        _c_step++;//将消费者对应的下标向后移动一位
        _c_step %= _maxcap;//保证消费者下标不会超过环形队列的大小
        pthread_mutex_unlock(&_c_mutex);

        V(_pspace_sem);
    }

生产者push后,证明环形队列中一定有数据,所以就需要在V后传入消费者关心的信号量,也就是需要传递_cdata_sem

消费者pop后,证明环形队列中一定有空间,所以就需要在V后传入生产者关心的信号量,也就是需要传递_pspace_sem

PV操作:

    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem)
    {
        sem_post(&sem);
    }

P操作代表申请资源,也就是semwait这个函数

V操作就是释放了资源,比如生产者就是释放了一个数据

这里要保证数据在为空的时候只能生产者运行,在数据为满的时候只能消费者去运行,

所以wait是为了保持顺序同步,保证即使消费者先调用,但是没有数据,就将消费者申请资源所关注的数据信号量送去等待队列里去等待

在封装V操作中,post就是释放资源,对于生产者就是给了个数据给消费者

对于消费者post就是释放了空间,生产者就能接着生产了

那消费者一开始调用P操作,没有数据就会阻塞,而生产者这边V了数据,消费者这边P就不会阻塞了可以拿到数据了

所以生产和消费这两者的PV操作是反的

生产者V了,消费者的p就停止阻塞了因为生产者给了消费者资源了

反之同理

main.cc:

void *Productor(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);

    while (true)
    {
        int data = rand()%10+1;
        rq->push(data);
        
        std::cout<<"Productor : data = "<< data << std::endl;

        sleep(1);
    }
    
    return nullptr;
}

void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);

    while (true)
    {
        int data = 0;
        rq->pop(&data);
        std::cout<<"Consumer : data = "<< data << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr)^getpid());
    RingQueue<Task> *rq = new RingQueue<Task>();

    pthread_t c, p;

    pthread_create(&p, nullptr, Productor, rq);
    pthread_create(&c, nullptr, Consumer, rq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    
    delete rq;
    return 0;
}

或者也可以让消费者疯狂消费数据,生产者疯狂生产

多生产多消费实现代码:

RingQueue.hpp:

在多生产多消费中,需要保证生产者和生产者之间、消费者和消费者之间的互斥关系,生产者和消费者之间的互斥关系已经由信号量承担了

所以在多生产多消费的代码中要加上锁

构造与析构中也要增加初始化锁与释放锁

template <class T>
class RingQueue
{
public:
    RingQueue(int maxcap = 5)
        : _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0)
    {
        sem_init(&_pspace_sem,0,maxcap);
        sem_init(&_cdata_sem,0,0);

        pthread_mutex_init(&_p_mutex,nullptr);
        pthread_mutex_init(&_c_mutex,nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&_pspace_sem);
        sem_destroy(&_cdata_sem);

        pthread_mutex_destroy(&_p_mutex);
        pthread_mutex_destroy(&_c_mutex);

    }

private:
    std::vector<T> _ringqueue; // 用vector模拟环形队列
    int _maxcap;               // 环形队列的最大容量

    int _p_step; // 生产者下标
    int _c_step; // 消费者下标

    sem_t _pspace_sem; // 生产者关注的空间资源
    sem_t _cdata_sem;  // 消费者关注的数据资源

    pthread_mutex_t _p_mutex;//保证生产者和生产者之间的互斥
    pthread_mutex_t _c_mutex;//保证消费者和消费者之间的互斥
};

push与pop

    void push(const T& in)
    {
        //生产数据先要申请信号量来预定资源
        P(_pspace_sem);//信号量的申请本来就是原子的,所以加锁的时候就需要在这之后

        pthread_mutex_lock(&_p_mutex);
        
        _ringqueue[_p_step] = in;//将所对应的数据放入到环形队列中
        _p_step++;//将生产者对应的下标向后移动一位
        _p_step %= _maxcap;//保证生产者下标不会超过环形队列的大小

        pthread_mutex_unlock(&_p_mutex);

        V(_cdata_sem);
    }

    void pop(T *out)
    {
        P(_cdata_sem);

        pthread_mutex_lock(&_c_mutex);
        *out = _ringqueue[_c_step];//将该位置的数据交给out作为输出型参数带出去
        _c_step++;//将消费者对应的下标向后移动一位
        _c_step %= _maxcap;//保证消费者下标不会超过环形队列的大小
        pthread_mutex_unlock(&_c_mutex);

        V(_pspace_sem);
    }

细节:

在加锁的时候要在申请信号量之后,这样能够提高并发度

如果是在申请信号量之前进行加锁,那么申请信号量的线程永远只有一个  不能够提高并发度

理解:

就像在电影院中,是先买票在进行排队的,这样能够加快进场的速度,如果排队后再买票,需要一人一人地进行操作,这相比上一种就会很慢的

申请信号量的操作是原子的,不需要加锁保护也能保证线程安全,所以并发申请信号量,串行访问临界资源能够提高并发度

main.cc:

在进行生产消费者模型中的数据问题,不仅仅是让二者看到同一份资源,更重要的是让消费者拿到资源并对资源进行处理,这里引入上一章的Task文件来进行数据处理

Task.hpp

#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    Divzero = 1,
    Modzero,
    Unknow
};

class Task
{
public:
    Task()
    {}
    Task(int data1, int data2, char oper)
        : _data1(data1), _data2(data2), _oper(oper),_exitcode(0)
    {}

    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
            if (_data2 == 0)
                _exitcode = Divzero;
            else
                _result = _data1 / _data2;
            break;
        case '%':
            if (_data2 == 0)
                _exitcode = Modzero;
            else
                _result = _data1 % _data2;
            break;
        default:
            _exitcode = Unknow;
            break;
        }
    }

    void operator()()
    {
        run();
    }

    std::string Getresult()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=";
        ret += std::to_string(_result);
        ret += "[exitcode=";
        ret += std::to_string(_exitcode);
        ret += "]";
        return ret;
    }

    std::string GetTask()
    {
        std::string ret = std::to_string(_data1);
        ret += _oper;
        ret += std::to_string(_data2);
        ret += "=?";
        return ret;
    }
    
    ~Task()
    {}

private:
    int _data1;
    int _data2;
    char _oper;

    int _exitcode;
    int _result;
};

接着在生产消费者的线程所执行的对应的方法中,基本和上一章类似

void *Productor(void *args)
{
    ThreadData *td = static_cast<ThreadData *>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand()%len];
        Task t(data1,data2,op);
        rq->push(t);
        std::cout<<"Productor : Task = "<< t.GetTask() << " who "<< name << std::endl;
        sleep(1);
    }
    
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData *>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    while (true)
    {
        Task t;
        rq->pop(&t);
        //处理数据
        t();
        std::cout << "Consumer : Task = " << t.GetTask() << " who: " << name << " result: " << t.Getresult() << std::endl;
        // sleep(1);
    }
    return nullptr;
}

我们也可以创建一个结构体来存储线程名称与任务

struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};
int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq = new RingQueue<Task>();
    pthread_t c[5], p[3];
    for(int i = 0;i<3;i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p+i, nullptr, Productor, td);
        usleep(10);

    }
    sleep(1);
    for(int i = 0;i<5;i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c+i, nullptr, Consumer, td);
        usleep(10);
    }
    for(int i = 0;i<3;i++)
    {
        pthread_join(p[i], nullptr);
    }
    for(int i = 0;i<5;i++)
    {
        pthread_join(c[i], nullptr);
    }
    return 0;
}

注意:在环形队列中允许多个生产者线程一起进行生活数据,也允许多个消费者线程一起消费数据,多个线程一起操作并非同时操作,任务开始时间有先后,但都是在进行处理的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2322675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot 连接 MySQL 配置参数详解

Spring Boot 连接 MySQL 配置参数详解 前言参数及含义常用参数及讲解和示例useUnicode 参数说明&#xff1a; 完整配置示例注意事项 前言 在 Spring Boot 中使用 Druid 连接池配置 MySQL 数据库连接时&#xff0c;URL 中 ? 后面的参数用于指定连接的各种属性。以下是常见参数…

[linux] linux基本指令 + shell + 文件权限

目录 1. Linux的认识 1.1. Linux的应用场景 1.2. Linux的版本问题 1.3. 操作系统的认识 1.4. 常用快捷键 2. 常用指令介绍 2.1. ADD 2.1.1. touch [file] 2.1.1.1. 文件的属性信息 2.1.2. mkdir [directory] 2.1.3. cp [file/directory] 2.1.4. echo [file] 2.1.4.…

Python实现小红书app版爬虫

简介&#xff1a;由于数据需求的日益增大&#xff0c;小红书网页版已经不能满足我们日常工作的需求&#xff0c;为此&#xff0c;小编特地开发了小红书手机版算法&#xff0c;方便大家获取更多的数据&#xff0c;提升工作效率。 手机版接口主要包括&#xff1a;搜素&#xff0…

【docker】docker-compose安装RabbitMQ

docker-compose安装RabbitMQ 1、配置docker-compose.yml文件&#xff08;docker容器里面的目录请勿修改&#xff09;2、启动mq3、访问mq4、查看服务器映射目录5、踩坑5.1、权限不足 1、配置docker-compose.yml文件&#xff08;docker容器里面的目录请勿修改&#xff09; versi…

playwright-go实战:自动化登录测试

1.新建项目 打开Goland新建项目playwright-go-demo 项目初始化完成后打开终端输入命令&#xff1a; #安装项目依赖 go get -u github.com/playwright-community/playwright-go #安装浏览器 go run github.com/playwright-community/playwright-go/cmd/playwrightlatest insta…

LeetCode hot 100 每日一题(13)——73. 矩阵置零

这是一道难度为中等的题目&#xff0c;让我们来看看题目描述&#xff1a; 给定一个 _m_ x _n_ 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 提示&#xff1a; m matrix.lengthn matrix[0].length1 < m, n …

Unity URP自定义Shader支持RenderLayer

前言&#xff1a; 当我们想用一个灯光只对特定的物体造成影响&#xff0c;而不对其余物体造成影响时&#xff0c;我们就需要设置相对应的LightLayer&#xff0c;但是这在URP12.0是存在的&#xff0c;在之后就不存在LightLayer这一功能&#xff0c;URP将其隐藏而改成了RenderLa…

Axure项目实战:智慧城市APP(完整交互汇总版)

亲爱的小伙伴&#xff0c;在您浏览之前&#xff0c;烦请关注一下&#xff0c;在此深表感谢&#xff01; 课程主题&#xff1a;智慧城市APP 主要内容&#xff1a;主功能&#xff08;社保查询、医疗信息、公交查询等&#xff09;、活动、消息、我的页面汇总 应用场景&#xff…

架构思维:预约抢茅子架构设计

文章目录 案例&#xff1a;预约抢茅子复杂度分析商品预约阶段等待抢购阶段商品抢购阶段订单支付阶段 技术方案商品预约阶段一、基于 Redis 单节点的分布式锁方案1. 核心流程2. 关键设计点 二、Redis 单节点方案的局限性1. 单点故障风险2. 主从切换问题 三、多节点 Redis 实现高…

基于SpringBoot+Vue的在教务管理(课程管理)系统+LW示例

1.项目介绍 系统角色&#xff1a;管理员、学生、教师功能模块&#xff1a;管理员&#xff08;学院管理、专业管理、班级管理、学生管理、教师管理、课程管理、选课修改&#xff09;、教师&#xff08;授课查询、教师课表、成绩录入&#xff09;、学生&#xff08;选修课程、学…

ubuntu桌面图标异常——主目录下的所有文件(如文档、下载等)全部显示在桌面

ubuntu桌面图标异常 问题现象问题根源系统级解决方案方法一:全局修改(推荐多用户环境)方法二:单用户修改(推荐个人环境)操作验证与调试避坑指南扩展知识参考文档问题现象 主目录文件异常显示 用户主目录(如/home/user/)下的所有文件(如文档、下载等)全部显示在桌面,…

sql结尾加刷题

找了一下mysql对extractvalue()、updatexml()函数的官方介绍https://dev.mysql.com/doc/refman/5.7/en/xml-functions.html#function_extractvalue ExtractValue(xml_frag, xpath_expr) 知识点 解释一下这两个参数xml_frag&#xff0c;是xml标记片段&#xff0c;第二个参数…

Linux学习笔记(应用篇三)

基于I.MX6ULL-MINI开发板 LED学习GPIO应用编程输入设备 开发板中所有的设备&#xff08;对象&#xff09;都会在/sys/devices 体现出来&#xff0c;是 sysfs 文件系统中最重要的目录结构 /sys下的子目录说明/sys/devices这是系统中所有设备存放的目录&#xff0c;也就是系统中…

【redis】事务详解,相关命令multi、exec、discard 与 watch 的原理

文章目录 什么是事务原子性一致性持久性隔离性 优势与 MySQL 对比用处 事务相关命令开启事务——MULTI执行事务——EXEC放弃当前事务——DISCARD监控某个 key——WATCH作用场景使用方法实现原理 事务总结 什么是事务 MySQL 事务&#xff1a; 原子性&#xff1a;把多个操作&am…

数据库基础知识点(系列七)

视图和索引相关的语句 1&#xff0e;引入视图的主要目的是什么? 答&#xff1a;数据库的基本表是按照数据库设计人员的观点设计的&#xff0c;并不一定符合用户的需求。SQL Server 2008可以根据用户需求重新定义表的数据结构&#xff0c;这种数据结构就是视图。视图是关系数据…

3.3 Taylor公式

1.定义 1.1 taylor公式 1.2 麦克劳林公式 1.3 推论 1.4 拉格朗日余项和皮亚诺型余项 2. 例题 3.几种特殊函数的麦克劳林展开

2000-2019年各省地方财政行政事业性收费收入数据

2000-2019年各省地方财政行政事业性收费收入数据 1、时间&#xff1a;2000-2019年 2、来源&#xff1a;国家统计局、统计年鉴 3、指标&#xff1a;行政区划代码、地区、年份、地方财政行政事业性收费收入 4、范围&#xff1a;31省 5、指标说明&#xff1a;地方财政行政事业…

Ftrans飞驰云联受邀参加“2025汽车零部件CIO年会“并荣获智象奖

2025年3月6日&#xff0c;由栖观汽车、栖观资讯和飞羽商务主办的“2025第二届中国汽车&零部件CIO年会暨智象奖颁奖盛典”于上海盛大召开&#xff0c;Ftrans飞驰云联作为国内领先的企业文件传输与数据交换解决方案提供商&#xff0c;受邀出席了年会&#xff0c;并凭借卓越的…

oracle查询归档日志使用量

1.统计最近30天的数据 SELECT TRUNC(first_time, DD) "日期", SUM(blocks * block_size) / 1024 / 1024 / 1024 "大小(GB)" FROM v$archived_log WHERE first_time > SYSDATE - 30 -- 统计最近30天的数据 GROUP BY TRUNC(first_time, DD) ORDER BY 1 D…