【Linux】POSIX信号量基于环形队列的生产消费模型

news2025/1/10 17:44:28

需要云服务器等云产品来学习Linux的同学可以移步/–>腾讯云<–/官网,轻量型云服务器低至112元/年,优惠多多。(联系我有折扣哦)

文章目录

  • 引入
  • 1. POSIX信号量
    • 1.1 信号量的概念
    • 1.2 信号量的使用
      • 1.2.1 信号量的初始化
      • 1.2.2信号量的销毁
      • 1.2.3 等待信号量(P操作)
      • 1.2.4 发布信号量(V操作)
  • 2. 基于环形队列的生产消费模型
    • 2.1 环形队列的概念
    • 2.2 使用环形队列和信号量来实现生产消费模型
      • 2.2.1 实现前的一些思考和注意事项
    • 2.2.2 代码实现与测试

引入

在我们之前实现的BlockQueue中,实际上是存在一些缺陷的,可以进行一些优化

image-20240207213422042

一个线程,在操作临界资源的时候,临界资源一定要是满足指定条件的,但是临界资源是否满足条件,我们在没有访问之前是无法得知的,所以只能先加锁、再检测、再操作、再解锁

所以检测是必不可少的,检测的本质也是再访问临界资源,由于我们对资源整体进行了加锁,就默认了我们对资源的整体使用,但是实际上也有可能存在对于同一份公共资源,是允许同时访问不同区域的,这个可以由程序员通过编码来控制不同区域的访问。

有没有一种方法能够提前得知当前情况下临界资源是否满足指定条件,直接让线程等待或者访问,当然是可以的,这就需要用到**信号量**

1. POSIX信号量

1.1 信号量的概念

在之前的文章中我们提到过信号量【Linux】进程间通信——system V 共享内存、消息队列、信号量 但是当时讲的是system V版本的信号量。这里是POSIX信号量。POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

那么什么是信号量呢?

  • 信号量的本质就是一个计数器,用于衡量临界资源中资源数量多少的计数器

  • 只要拥有信号量,就表示在未来一定能够拥有临界资源的一部分

申请信号量的本质就是**对临界资源中的特定的一小部分资源的预定**

工作机制:

用信号量表示当前临界资源被划分后的小块的个数,任何线程在使用临界资源之前都先对信号量进行申请,如果申请到信号量就表示能够访问到一部分临界资源,如果没有申请到就表示当前条件不就绪,线程需要进行等待

根据工作机制的分析,我们知道,信号量这个计数器需要进行自增和自减的操作

但是想一个问题:线程在访问临界资源之前都需要申请信号量 ==> 线程都要看到这个信号量 ==> 信号量本身就是一个临界资源 ==> 信号量的操作一定要是原子的

而事实和我们的推理结论是一致的,信号量的自增和自减都是原子的操作

  • 其中信号量自减的操作就表示申请资源,也叫做P操作;信号量自增的操作就表示释放资源,也叫做V操作

  • 信号量的核心操作也就是:PV原语

  • 如果信号量的值为1,也就可以实现互斥锁

1.2 信号量的使用

1.2.1 信号量的初始化

image-20240207223352439

头文件:
#include <semaphore.h>
函数原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数解释:
	sem:要初始化的信号量
	pshared:该信号量的共享方式,0表示线程间共享,非零表示进程间共享
	value:信号量的初值(临界资源的个数)
函数描述:
	初始化一个信号量
返回值:
	调用成功返回0,否则返回-1同时设置错误码

1.2.2信号量的销毁

image-20240207223413534

头文件:
#include <semaphore.h>
函数原型:
int sem_destroy(sem_t *sem);
参数解释:
	sem:要释放的信号量
函数描述:
	释放一个信号量
返回值:
	调用成功返回0,否则返回-1同时设置错误码

1.2.3 等待信号量(P操作)

image-20240207223428063

头文件:
#include <semaphore.h>
函数原型:
int sem_wait(sem_t *sem);
参数解释:
	sem:要等待(申请)的信号量
函数描述:
	申请一个信号量,如果当前信号量无法被申请,就自动挂起,等待能够被申请的时候再申请。调用成功后会将信号量的值减一
返回值:
	调用成功返回0,否则返回-1同时设置错误码

1.2.4 发布信号量(V操作)

image-20240207223446448

头文件:
#include <semaphore.h>
函数原型:
int sem_post(sem_t *sem);
参数解释:
	sem:需要发布(释放)的信号量
函数描述:
	释放一个信号量
返回值:
	调用成功返回0,否则返回-1同时设置错误码

2. 基于环形队列的生产消费模型

2.1 环形队列的概念

环形队列之前我们就了解过了,只要是环形队列,就存在判空判满的问题。实际上并不是真正的环形队列,而是通过数组模拟的,当数据加入到最后的位置时直接数组的大小即可。通常情况下,判空判满的问题我们是通过空出一个位置,当两个指针指向同一个位置的时候是空,当只剩一个位置的时候就是满,但是我们这里不需要关注。

image-20240207224728527

2.2 使用环形队列和信号量来实现生产消费模型

2.2.1 实现前的一些思考和注意事项

我们在上一节的时候讲到,要维护生产者消费者模型,就需要维护“321原则”,也就是三种关系,两个角色,一个交易场所

在环形队列中,三个关系就是生产者与生产者之前的关系(互斥),消费者与消费者之间的关系(互斥),生产者与消费者之间的关系(无关)

两个角色:生产者和消费者

一个交易场所:环形队列

但是,由于环形队列的特殊结构,其中的任意一个位置都可以被访问到,所以一个生产者和一个消费者可以同时访问这个队列中的不同位置,只有生产者和消费者处于同一个位置的时候才需要考虑互斥与同步的问题

所以我们的核心工作就是

  • 消费者不能超过生产者
  • 生产者不能给消费者套圈
  • 当生产者和消费者处于同一位置的时候,如果此时队列为空就让生产者先走,如果此时队列为满就让消费者先走

那么信号量所表示的资源又如何理解呢?

对于生产者,他只关注这个队列中的剩余空间,因为他需要空间来存放生产的结果

对于消费者,他只需要关注这个队列的数据资源,因为需要消耗数据

所以最终我们定义两个信号量,分别表示剩余空间和已有的数据

如何维护生产者和生产者的关系,消费者和消费者的关系

这里各个生产者之间需要互斥,消费者也需要互斥,但是生产者和消费者之间没有任何关系,最终为了维护这种互斥关系,我们就需要两个不相关的锁分别管理生产者和消费者

2.2.2 代码实现与测试

/*RingQueue.hpp*/
#pragma once
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>

const int gnum = 5; // 默认的环形队列大小

template <class T>
class RingQueue
{
public:
    RingQueue(const int &num = gnum) : _queue(num) // 构造函数
    {
        // 初始化两个信号量
        int n = sem_init(&_spaceSem, 0, num);
        assert(n == 0);
        n = sem_init(&_dataSem, 0, 0);
        assert(0 == n);
        _productorStep = _consumerStep = 0; // 初始化生产者和消费者需要访问的位置
    }

    void push(const T &in) // 生产者使用空间
    {
        // 申请信号量之前,数据已经并发的生产完成
        P(_spaceSem);                    // 在向队列里面push数据之前申请_spaceSem信号量
        pthread_mutex_lock(&_pmutex);    // 在申请信号量之后考虑多个生产线程的互斥访问的问题
        _queue[_productorStep++] = in;   // 申请到信号量之后队列中指定位置插入数据
        _productorStep %= _queue.size(); // 更新生产者下标
        pthread_mutex_unlock(&_pmutex);  // 解锁
        V(_dataSem);                     // 插入数据之后,可使用的数据就要增加,所以使用V操作释放_dataSem
    }

    void pop(T *out) // 消费者使用数据
    {
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);
        *out = _queue[_consumerStep++];
        _consumerStep %= _queue.size();
        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);
    }

    ~RingQueue() // 析构函数
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
    }

private:
    // 封装成私有的PV操作
    void P(sem_t &sem)
    {
        int n = sem_wait(&sem);
        assert(n == 0);
        (void)n;
    }
    void V(sem_t &sem)
    {
        int n = sem_post(&sem);
        assert(n == 0);
        (void)n;
    }

private:
    std::vector<T> _queue;
    sem_t _spaceSem;         // 生产者看重的剩余空间信号量
    sem_t _dataSem;          // 消费者看重的存在的数据个数信号量
    int _productorStep;      // 生产者要访问的下标
    int _consumerStep;       // 消费者要访问的下标
    pthread_mutex_t _pmutex; // 维护生产者关系的锁
    pthread_mutex_t _cmutex; // 维护消费者关系的锁
};

测试1:使用整数测试单生产单消费

#include "RingQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

void *productor(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while(true)
    {
        //生产数据
        int data = rand() % 10 + 1; 
        rq->push(data);
        std::cout << "生产数据成功,生产的数据是" << data << std::endl;
        // sleep(1); // 让生产慢一点
    }
}
void *consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while(true)
    {
        //消费数据
        int data;
        rq->pop(&data);
        std::cout << "消费数据成功,消费的数据是" << data << std::endl;
        sleep(1); // 让消费慢一点
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid()); // 种随机数种子,为了让产生的随机数更随机
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_t p,c;
    pthread_create(&p, nullptr, productor, rq);
    pthread_create(&c, nullptr, consumer, rq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

image-20240207234237194

测试2:使用Task测试单生产单消费

/*Task*/
#pragma once

#include <string>
#include <iostream>
#include <functional>

static std::string oper = "+-*/%";

class CalTask
{
public:
    using func_t = std::function<int(int, int, char)>;

public:
    CalTask() {}
    CalTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func)
    {
    }
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
class SaveTask
{
    typedef std::function<void(const std::string)> func_t;

public:
    SaveTask() {}
    SaveTask(const std::string &msg, func_t func)
        : _msg(msg), _func(func)
    {
    }
    void operator()()
    {
        _func(_msg);
    }

private:
    std::string _msg;
    func_t _func;
};
int mymath(int a, int b, char op)
{
    int ans = 0;
    switch (op)
    {
    case '+':
        ans = a + b;
        break;
    case '-':
        ans = a - b;
        break;
    case '*':
        ans = a * b;
        break;
    case '/':
    {
        if (b == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            ans = -1;
        }
        else
            ans = a / b;
    }
    break;
    case '%':
    {
        if (b == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            ans = -1;
        }
        else
            ans = a % b;
    }
    break;
    default:
        break;
    }
    return ans;
}
void Save(const std::string &msg)
{
    FILE *fp = fopen("./log.txt", "a+");
    if (fp == NULL)
    {
        std::cerr << "open file error" << std::endl;
        return;
    }
    fputs(msg.c_str(), fp);
    fputs("\n", fp);

    fclose(fp);
}
/*main.cc*/
#include "RingQueue.hpp"
#include "Task.hpp"
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

void *productor(void *args)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);
    while(true)
    {
        //生产数据
        int x = rand() % 10 + 1; 
        int y = rand() % 10 + 1;
        int opNum = rand() % oper.size();
        CalTask t(x, y, oper[opNum], mymath);
        rq->push(t);
        std::cout << "生产数据成功" << t.toTaskString() << std::endl;
        sleep(1); // 让生产慢一点
    }
}
void *consumer(void *args)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);
    while(true)
    {
        //消费数据
        CalTask t;
        rq->pop(&t);
        std::string result = t();
        std::cout << "消费数据成功 " << result << std::endl;
        // sleep(1); // 让消费慢一点
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid()); // 种随机数种子,为了让产生的随机数更随机
    RingQueue<CalTask> *rq = new RingQueue<CalTask>();
    pthread_t p,c;
    pthread_create(&p, nullptr, productor, rq);
    pthread_create(&c, nullptr, consumer, rq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

image-20240207235009665

测试3:多生产多消费的测试

#include "RingQueue.hpp"
#include "Task.hpp"
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

void *productor(void *args)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);
    while(true)
    {
        //生产数据
        int x = rand() % 10 + 1; 
        int y = rand() % 10 + 1;
        int opNum = rand() % oper.size();
        CalTask t(x, y, oper[opNum], mymath);
        rq->push(t);
        std::cout << "生产数据成功" << t.toTaskString() << std::endl;
        sleep(1); // 让生产慢一点
    }
}
void *consumer(void *args)
{
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);
    while(true)
    {
        //消费数据
        CalTask t;
        rq->pop(&t);
        std::string result = t();
        std::cout << "消费数据成功 " << result << std::endl;
        // sleep(1); // 让消费慢一点
    }
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid()); // 种随机数种子,为了让产生的随机数更随机
    RingQueue<CalTask> *rq = new RingQueue<CalTask>();
    pthread_t p[4],c[8];
    for(int i = 0; i < 4; ++i) pthread_create(p + i, nullptr, productor, rq);
    for(int i = 0; i < 8; ++i) pthread_create(c + i, nullptr, consumer, rq);

    for(int i = 0; i < 4; ++i) pthread_join(p[i], nullptr);
    for(int i = 0; i < 8; ++i) pthread_join(c[i], nullptr);
    return 0;
}

image-20240208001217869


多生产多消费的意义:不管是环形队列还是阻塞队列,多线程的意义在于构建or获取任务是要花时间的,效率比较低,当消费的时候也是要花时间的,不单单只是拿出来就行了,所以多生产多消费的时候的意义在于生产之前,消费之后,处理任务获取任务的时候本身也是要花费时间的,可以在生产之前与消费之后让线程并行执行。

条件变量是一种同步机制,它允许线程等待某个条件的发生,通常与互斥锁一起使用。而信号量是一种计数器,它可以用于控制对共享资源的访问;如果想让每一刻只有一个线程访问共享资源,可以使用条件变量。但如果需要允许多个线程并发访问共享资源的不同区域,则可以使用信号量


本节完…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1440831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI助力农作物自动采摘,基于DETR(DEtection TRansformer)开发构建番茄采摘场景下番茄成熟度检测识别计数分析系统

去年十一那会无意间刷到一个视频展示的就是德国机械收割机非常高效自动化地24小时不间断地在超广阔的土地上采摘各种作物&#xff0c;专家设计出来了很多用于采摘不同农作物的大型机械&#xff0c;看着非常震撼&#xff0c;但是我们国内农业的发展还是相对比较滞后的&#xff0…

IOS破解软件安装教程

对于很多iOS用户而言&#xff0c;获取软件的途径显得较为单一&#xff0c;必须通过App Store进行下载安装。 这样的限制&#xff0c;时常让人羡慕安卓系统那些自由下载各类版本软件的便捷。 心中不禁生出疑问&#xff1a;难道iOS世界里&#xff0c;就不存在所谓的“破解版”软件…

Visual Studio 2010+C#实现信源编码

1. 要求 本文设计了一套界面系统&#xff0c;该系统能够实现以下功能&#xff1a; 克劳夫特不等式的计算&#xff0c;并且能够根据计算结果给出相应的信息。可通过用户输入的初始条件然后给出哈夫曼编码以及LZ编码&#xff0c;结果均通过对话框来显示哈夫曼编码结果包含相应的…

解密输入输出迷局:蓝桥杯与ACM中C++/C语言常见问题揭秘

关于C中的常见输入输出汇总 带空格的字符串&#xff1a; ​ 对于这种输入方式我们选择使用gets() 函数来进行输入&#xff0c;gets用于从标准输入&#xff08;通常是键盘&#xff09;读取一行文本并将其存储为字符串&#xff0c;直到遇到换行符&#xff08;‘\n’&#xff09…

Fink CDC数据同步(四)Mysql数据同步到Kafka

依赖项 将下列依赖包放在flink/lib flink-sql-connector-kafka-1.16.2 创建映射表 创建MySQL映射表 CREATE TABLE if not exists mysql_user (id int,name STRING,birth STRING,gender STRING,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector mysql-cdc,hostn…

飞书上传图片

飞书上传图片 1. 概述1.1 访问凭证2. 上传图片获取image_key1. 概述 飞书开发文档上传图片: https://open.feishu.cn/document/server-docs/im-v1/image/create 上传图片接口,支持上传 JPEG、PNG、WEBP、GIF、TIFF、BMP、ICO格式图片。 在请求头上需要获取token(访问凭证) …

go消息队列RabbitMQ - 订阅模式-fanout

1、发布订阅 订阅模式&#xff0c;消息被路由投递给多个队列&#xff0c;一个消息被多个消费者获取。 1&#xff09; 可以有多个消费者 2&#xff09; 每个消费者有自己的queue&#xff08;队列&#xff09; 3&#xff09; 每个队列都要绑定到Exchange&#xff08;交换机&…

Linux系统安装(CentOS Vmware)

学习环境安装 VMware安装 VMware下载&安装 访问官网&#xff1a;https://www.vmware.com 在此处可以选择语言 点击China&#xff08;简体中文&#xff09; 点击产品&#xff0c;点击Workstation Pro 下滑&#xff0c;点击下载试用版 下滑找到Workstation 17 Pro for Wi…

ARP欺骗攻击利用之内网截取图片

Arp欺骗&#xff1a;目标ip的流量经过我的网卡&#xff0c;从网关出去。 Arp断网&#xff1a;目标ip的流量经过我的网卡 1. echo 1 >/proc/sys/net/ipv4/ip_forward 设置ip流量转发&#xff0c;不会出现断网现象 有时不能这样直接修改&#xff0c;还有另外一种方法 修…

基于华为云欧拉操作系统(HCE OS)容器化部署传统应用(Redis+Postgresql+Git+SpringBoot+Nginx)

写在前面 博文内容为 华为云欧拉操作系统入门级开发者认证(HCCDA – Huawei Cloud EulerOS)实验笔记整理认证地址&#xff1a;https://edu.huaweicloud.com/certificationindex/developer/9bf91efb086a448ab4331a2f53a4d3a1博文内容涉及一个传统 Springboot 应用HCE部署&#x…

使用QT编写一个简单QQ登录界面

widget.cpp #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);//设置窗口标题this->setWindowTitle("QQ");//设置窗口图标this->setWindowIcon(…

uniapp的配置和使用

①安装环境和编辑器 注册小程序账号 微信开发者工具下载 uniapp 官网 HbuilderX 下载 首先先下载Hbuilder和微信开发者工具 &#xff08;都是傻瓜式安装&#xff09;&#xff0c;然后注册小程序账号&#xff1a; 拿到appid&#xff1a; ②简单通过demo使用微信开发者工具和…

Linux——进程池(管道)

经过了管道的介绍之后&#xff0c;我们可以实现了进程间通信&#xff0c;现在我就来简单介 绍一下管道的应用场景——进程池。1. 引入 在我们的编码过程中&#xff0c;不乏会听到&#xff0c;内存池&#xff0c;进程池&#xff0c;空间配置器等等名词&#xff0c;这些是用来干…

spring boot学习第十二篇:mybatis框架中调用存储过程控制事务性

1、MySQL方面&#xff0c;已经准备好了存储过程&#xff0c;参考&#xff1a;MYSQL存储过程&#xff08;含入参、出参&#xff09;-CSDN博客 2、pom.xml文件内容如下&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"…

zer0pts-2020-memo:由文件偏移处理不正确--引发的堆溢出

启动脚本 #!/bin/sh qemu-system-x86_64 \-m 256M \-kernel ./bzImage \-initrd ./rootfs.cpio \-append "root/dev/ram rw consolettyS0 oopspanic panic1 kaslr quiet" \-cpu kvm64,smep,smap \-monitor /dev/null \-nographic -enable-kvm/ # dmesg | grep page …

电商小程序01需求分析

目录 1 电商用例分析2 功能架构3 原型开发3.1 首页3.2 店铺页面3.3 配货单3.4 配货单有货3.5 我的应用3.6 商品详情3.7 订单确认3.8 收货地址3.9 店铺详情3.10 店铺分类3.11 商品分类 总结 低代码学习的时候最高效的方法就是带着问题去学习&#xff0c;一般可以先从电商小程序开…

507. Perfect Number(完美数)

题目描述 对于一个 正整数&#xff0c;如果它和除了它自身以外的所有 正因子 之和相等&#xff0c;我们称它为 「完美数」。 给定一个 整数 n&#xff0c; 如果是完美数&#xff0c;返回 true&#xff1b;否则返回 false。 问题分析 按照题目要求找出每一个因子&#xff0c…

H12-821_74

74.在某路由器上查看LSP&#xff0c;看到如下结果&#xff1a; A.发送目标地址为3.3.3.3的数据包时&#xff0c;打上标签1026&#xff0c;然后发送。 B.发送目标地址为4.4.4.4的数据包时&#xff0c;不打标签直接发送。 C.当路由器收到标签为1024的数据包&#xff0c;将把标签…

文心一言 VS 讯飞星火 VS chatgpt (196)-- 算法导论14.3 4题

四、用go语言&#xff0c;给定一棵区间树 T 和一个区间 i &#xff0c;请描述如何在 O(min(n&#xff0c;klgn)) 时间内列出 T 中所有与 i 重叠的区间&#xff0c;其中 k 为输出的区间数。(提示:一种简单的方法是做若干次查询&#xff0c;并且在这些查询操作中修改树&#xff0…

Java图形化界面编程—— 基本组件和对话框 笔记

2.5 AWT中常用组件 2.5.1 基本组件 组件名功能ButtonButtonCanvas用于绘图的画布Checkbox复选框组件&#xff08;也可当做单选框组件使用&#xff09;CheckboxGroup选项组&#xff0c;用于将多个Checkbox 组件组合成一组&#xff0c; 一组 Checkbox 组件将只有一个可以 被选中…