Linux操作系统7- 线程同步与互斥7(RingQueue环形队列生产者消费者模型改进)

news2025/3/31 14:24:52

上篇文章:Linux操作系统7- 线程同步与互斥6(POSIX信号量与环形队列生产者消费者模型)-CSDN博客

本篇代码仓库:myLerningCode/l36 · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)

目录

一. 单生产单消费单保存模型

1.1 RingQueue.hpp

1.2 Task.hpp

1.3 MainPC.cpp

1.4 测试

二. 多生产多消费模型        

2.1 分析与代码 

2.2 多生产多消费的意义


一. 单生产单消费单保存模型

        通过RingQueue可以实现生产者消费者之间的协同工作,如果现在想要将消费者的输出结果保存在文件中应该怎么办?

        可以定义两个环形队列,三个线程。让消费者充当第二个队列的生产者。

代码如下:

1.1 RingQueue.hpp

        直接使用上篇文件的代码即可。然后我们需要新增一个类,这个类中包含两个环形队列用于消费者同时访问两个队列

#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
const int gnum = 10; // 阻塞队列的最大容量

template <class T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        // P操作,申请信号量sem--
        int n = sem_wait(&sem);
        if (n < 0)
            std::cerr << "P操作失败" << std::endl;
    }

    void V(sem_t &sem)
    {
        // V操作,释放信号量,sem++
        int n = sem_post(&sem);
        if (n < 0)
            std::cerr << "V操作失败" << std::endl;
    }

public:
    RingQueue(const int &maxnum = gnum) : _maxnum(maxnum), _ringQueue(gnum)
    {
        // 在构造函数中完成信号量的初始化
        int n = sem_init(&_spaceSem, 0, maxnum);
        if (n < 0)
            std::cerr << "spaceSem信号量初始化失败" << std::endl;
        n = sem_init(&_dataSem, 0, 0);
        if (n < 0)
            std::cerr << "dataSem信号量初始化失败" << std::endl;

        _producerStep = _consumerStep = 0;
    }

    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
    }

    // 生产者插入数据
    void push(const T &in)
    {
        // 申请空间资源
        P(_spaceSem);

        // 插入数据
        _ringQueue[_producerStep++] = in;
        _producerStep %= _maxnum;

        // 释放一个数据资源
        V(_dataSem);
    }

    // 消费者获取数据
    void pop(T *out)
    {
        // 申请数据资源
        P(_dataSem);

        // 插入数据
        *out = _ringQueue[_consumerStep++];
        _consumerStep %= _maxnum;

        // 释放一个空间资源
        V(_spaceSem);
    }

private:
    std::vector<T> _ringQueue; // 使用数组来实现环形队列
    size_t _maxnum;

    sem_t _spaceSem; // 生产者空间资源信号量
    sem_t _dataSem;  // 消费者数据资源信号量

    int _producerStep; // 生产者下标
    int _consumerStep; // 消费者下标
};

1.2 Task.hpp

        需要新增一个保存者的任务

#pragma once
#include <iostream>
#include <cstdio>
#include <functional>

class CalTask
{
    using func_t = std::function<int(int, int, char)>; // func是一个函数
    // typedef std::function<int(int,int)> func;
public:
    CalTask() {}

    CalTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func) {}

    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

    // 返回任务操作的结果
    std::string toString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;         // 操作的任务的id
    func_t _callback; // 调用的函数
};

class SaveTask
{
    typedef std::function<void(const std::string &)> func_t;

public:
    SaveTask() {}
    SaveTask(const std::string &message, func_t func)
        : _message(message), _callback(func) {}
    void operator()()
    {
        _callback(_message);
    }

private:
    std::string _message; // 保存的信息
    func_t _callback;     // 将信息写入文件中
};

const std::string oper = "+-*/%";
int my_math(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero" << std::endl;
            return -1;
        }
        else
        {
            result = x / y;
        }
        break;
    }
    case '%':
        if (y == 0)
        {
            std::cerr << "moved zero" << std::endl;
            return -1;
        }
        else
        {
            result = x % y;
        }
        break;
    default:
        break;
    }
    return result;
}

void Save(const std::string &message)
{
    const std::string task_pwd = "./log.txt";
    FILE *fp = fopen(task_pwd.c_str(), "a+");
    if (nullptr == fp)
    {
        std::cerr << "saver open error" << std::endl;
        return;
    }
    fputs(message.c_str(), fp);
    fputc('\n', fp);

    fclose(fp);
}

1.3 MainPC.cpp

#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>

// 生产者
void *ProductorRoutine(void *args)
{
    RingQueue<CalTask> *cal_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_cal_rq;
    while (true)
    {
        int x = rand() % 20;
        int y = rand() % 50;
        const char op = oper[rand() % oper.size()];
        CalTask ct(x, y, op, my_math);
        cal_rq->push(ct);
        std::cout << "生产者生产任务:" << ct.toString() << " 并传递给消费者完成" << std::endl;
    }
}

// 消费者
void *ConsumerRoutine(void *args)
{
    RingQueue<CalTask> *cal_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_cal_rq;
    RingQueue<SaveTask> *save_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_save_rq;

    while (true)
    {
        CalTask ct;
        cal_rq->pop(&ct);
        std::string result = ct();
        std::cout << "消费者实现任务:" << result << " 实现完成!" << std::endl;
        SaveTask st(result, Save);
        save_rq->push(st);
        std::cout << "消费者传递任务:" << result << " 给保存者完成!" << std::endl;
    }
}

// 保存者
void *SaverRoutine(void *args)
{
    RingQueue<SaveTask> *save_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_save_rq;
    while (true)
    {
        SaveTask st;
        save_rq->pop(&st);
        st();
        std::cout << "保存者保存任务完成!" << std::endl;
    }
}

void test1()
{
    RingQueues<CalTask, SaveTask> *rqs = new RingQueues<CalTask, SaveTask>;
    rqs->_cal_rq = new RingQueue<CalTask>();
    rqs->_save_rq = new RingQueue<SaveTask>();

    pthread_t p, c, s;
    pthread_create(&p, nullptr, ProductorRoutine, rqs);
    pthread_create(&c, nullptr, ConsumerRoutine, rqs);
    pthread_create(&s, nullptr, SaverRoutine, rqs);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    pthread_join(s, nullptr);
    delete rqs;
}

int main()
{
    srand((unsigned int)time(0) ^ getpid() ^ pthread_self());
    test1();
    return 0;
}

1.4 测试

        运行结果如下:

二. 多生产多消费模型        

2.1 分析与代码 

        RingQueue环形队列可以保证单个生产者和单个消费者之间的同步与互斥,如果现在有多个生产者和多个消费者的话。如何保证生产者之间的互斥?消费者者之间的互斥?

        阻塞队列中,我们通过加锁的方式让同一时刻只能有一个生产者线程进入临界区或者一个消费者进入临界区。

        而环形队列中, 通过信号量保证了生产者消费者之间的同步与互斥。如果想要保证消费者与消费者之间的互斥,生产者与生产者之间的互斥,也需要加锁保护

        在RingQueue中添加两个成员变量,一个生产者互斥锁,一个消费者互斥锁。同时需要在构造函数中完成锁的初始化,析构函数中完成锁的销毁。

        并且在push函数中加生产者锁,在pop函数中加消费者锁。以实现生产者与生产者之间的互斥和消费者与消费者之间的互斥。(本质是防止多个线程同时访问导致生产者下标或者消费者下标出现数据错误) 

代码如下:

#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
const int gnum = 10; // 阻塞队列的最大容量

template <class T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        // P操作,申请信号量sem--
        int n = sem_wait(&sem);
        if (n < 0)
            std::cerr << "P操作失败" << std::endl;
    }

    void V(sem_t &sem)
    {
        // V操作,释放信号量,sem++
        int n = sem_post(&sem);
        if (n < 0)
            std::cerr << "V操作失败" << std::endl;
    }

public:
    RingQueue(const int &maxnum = gnum) : _maxnum(maxnum), _ringQueue(gnum)
    {
        // 在构造函数中完成信号量的初始化
        int n = sem_init(&_spaceSem, 0, maxnum);
        if (n < 0)
            std::cerr << "spaceSem信号量初始化失败" << std::endl;
        n = sem_init(&_dataSem, 0, 0);
        if (n < 0)
            std::cerr << "dataSem信号量初始化失败" << std::endl;

        _producerStep = _consumerStep = 0;

        // 初始化锁
        pthread_mutex_init(&_pmtx, nullptr);
        pthread_mutex_init(&_cmtx, nullptr);
    }

    ~RingQueue()
    {
        // 销毁信号量与互斥锁
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);

        //
        pthread_mutex_destroy(&_cmtx);
        pthread_mutex_destroy(&_pmtx);
    }

    // 生产者插入数据
    void push(const T &in)
    {
        // 生产者加锁,保证生产者与生产者之间的互斥
        pthread_mutex_lock(&_pmtx);
        // 申请空间资源
        P(_spaceSem);

        // 插入数据
        _ringQueue[_producerStep++] = in;
        _producerStep %= _maxnum;

        // 释放一个数据资源
        V(_dataSem);
        // 解锁
        pthread_mutex_unlock(&_pmtx);
    }

    // 消费者获取数据
    void pop(T *out)
    {
        // 消费者加锁
        pthread_mutex_lock(&_cmtx);
        // 申请数据资源
        P(_dataSem);

        // 插入数据
        *out = _ringQueue[_consumerStep++];
        _consumerStep %= _maxnum;

        // 释放一个空间资源
        V(_spaceSem);

        // 解锁
        pthread_mutex_unlock(&_cmtx);
    }

private:
    std::vector<T> _ringQueue; // 使用数组来实现环形队列
    size_t _maxnum;

    sem_t _spaceSem; // 生产者空间资源信号量
    sem_t _dataSem;  // 消费者数据资源信号量

    int _producerStep; // 生产者下标
    int _consumerStep; // 消费者下标

    pthread_mutex_t _pmtx;
    pthread_mutex_t _cmtx;
};

MainPC.cpp

#include <iostream>
#include <memory>
#include <string>

#include <unistd.h>
#include <pthread.h>
#include "RingQueue.hpp"
#include "Task.hpp"

const std::string OP = "+-*/%";
void *producer(void *args)
{
    // 获取交易场所 - 阻塞队列
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 100;
        char op = OP[rand() % OP.size()];
        // 打印日志
        printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);
        CalTask ct(x, y, op, my_math);
        rq->push(ct);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // 获取交易场所 - 生产消费阻塞队列,消费保存阻塞队列
    RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);

    while (true)
    {
        // 获取任务计算
        CalTask ct;
        rq->pop(&ct);
        std::string result = ct();
        std::cout << "消费者获取数据并计算:" << result << std::endl;
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(0) ^ getpid());
    // 建立任务队列和保存队列
    RingQueue<CalTask> *rq = new RingQueue<CalTask>;

    pthread_t c[3], p[3];
    pthread_create(&c[0], nullptr, consumer, (void *)rq);
    pthread_create(&c[1], nullptr, consumer, (void *)rq);
    pthread_create(&c[2], nullptr, consumer, (void *)rq);

    pthread_create(&p[0], nullptr, producer, (void *)rq);
    pthread_create(&p[1], nullptr, producer, (void *)rq);
    pthread_create(&p[2], nullptr, producer, (void *)rq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(c[2], nullptr);

    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);
    delete rq;
    return 0;
}

测试结果如下:

可以看到,长时间没有出错

如果将加锁解锁操作进行注释:

会出现段错误,因为访问了非法内存

2.2 优化加解锁

        我们申请信号量和释放信号量是原子操作,不需要加锁解锁。无需将这个两个代码放入到临界区。并且一个线程在加锁期间,其他线程是可以申请信号量的!

    // 生产者插入数据
    void push(const T &in)
    {
        // 申请空间资源
        P(_spaceSem);

        // 生产者加锁,保证生产者与生产者之间的互斥
        pthread_mutex_lock(&_pmtx);

        // 插入数据
        _ringQueue[_producerStep++] = in;
        _producerStep %= _maxnum;

       // 解锁
       pthread_mutex_unlock(&_pmtx);

        // 释放一个数据资源
        V(_dataSem);
    }

    // 消费者获取数据
    void pop(T *out)
    {
        // 申请数据资源
        P(_dataSem);

        // 消费者加锁
        pthread_mutex_lock(&_cmtx);

        // 插入数据
        *out = _ringQueue[_consumerStep++];
        _consumerStep %= _maxnum;

        // 解锁
        pthread_mutex_unlock(&_cmtx);

        // 释放一个空间资源
        V(_spaceSem);
    }

        一样一来,一个线程加锁进入临界区之后不会去影响其他线程申请信号量。从而提高整体的效率。 

2.3 多生产多消费的意义

        与阻塞队列BlockQueue一样,多生产多消费的时候。生产线程生产数据可能需要很多时间,一个生产者生产者访问环形队列的时候不妨碍其他线程生产自己的资源。一个消费者访问环形队列拿数据的时候不妨碍其他消费者拿到数据进行处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2323857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

将 Markdown 表格结构转换为Excel 文件

在数据管理和文档编写过程中&#xff0c;我们经常使用 Markdown 来记录表格数据。然而&#xff0c;Markdown 格式的表格在实际应用中不如 Excel 方便&#xff0c;特别是需要进一步处理数据时。因此&#xff0c;我们开发了一个使用 wxPython 的 GUI 工具&#xff0c;将 Markdown…

微信小程序逆向开发

一.wxapkg文件 如何查看微信小程序包文件&#xff1a; 回退一级 点击进入这个目录 这个就是我们小程序对应的文件 .wxapkg概述 .wxapkg是微信小程序的包文件格式&#xff0c;且其具有独特的结构和加密方式。它不仅包含了小程序的源代码&#xff0c;还包括了图像和其他资源文…

Spring Data审计利器:@LastModifiedDate详解!!!

&#x1f552; Spring Data审计利器&#xff1a;LastModifiedDate详解&#x1f525; &#x1f31f; 简介 在数据驱动的应用中&#xff0c;记录数据的最后修改时间是常见需求。Spring Data的LastModifiedDate注解让这一过程自动化成为可能&#xff01;本篇带你掌握它的核心用法…

wms窗口/多窗口/自由窗口systemui侧边栏手势退出实战-学员作业

背景&#xff1a; 再学习了马哥的分屏自由窗口专题课程时候&#xff0c;有一个需求就是实现自由窗口置顶的功能&#xff0c;这个需求实现后&#xff0c;自由窗口就会一直处于顶端&#xff0c;不会因为打开其他Activity导致自由窗口退出。 不会因为打开了其他Activity而导致短…

服装零售行业数据分析方案

在数据洪流的时代&#xff0c;大数据分析已成为服装产业的强大引擎&#xff0c;助力企业飞速提升运营效率&#xff0c;削减成本&#xff0c;并优化资源配置。在服饰行业的生产运营链中&#xff0c;商业智能&#xff08;BI&#xff09;工具扮演着至关重要的角色&#xff0c;它们…

基于大模型的pc版语音对话问答

Vosk基础知识&#xff1a; Vosk 是一个强大的开源语音识别工具包&#xff0c;以下是对它的详细介绍&#xff1a; 特点 离线识别&#xff1a;Vosk 的显著特点是支持离线语音识别。这意味着在没有网络连接的情况下&#xff0c;也能进行语音识别操作&#xff0c;避免了因网络问…

深入理解 Linux 内核中的 GPU 子系统:从 DRM 到 NXP 驱动架构全解读

本文不仅为 GPU 子系统的深入复习笔记&#xff0c;更是一本面向 Linux 内核开发者、嵌入式图形系统开发人员的实践指南。本文围绕 drivers/gpu 展开&#xff0c;特别聚焦 NXP i.MX 系列平台的 GPU 架构和 Linux-imx 的实现方式&#xff0c;内容超 5000 字&#xff0c;适合收藏学…

Allegro界面颜色改变设置

概述&#xff1a;本文主要讲解如何改变allegro的背景颜色&#xff0c;改为自己喜欢的颜色 1、 打开Allegro文件 2、 Setup—User Preference—UI—General—Allegro_theme选择Light即可 改变前 改变后

ThreadLocal与Cookie + Session?

这篇文章主要在做 Echo 社区项目的时候写的&#xff0c;在保持用户登录态的这个需求下&#xff0c;为啥要用 ThreadLocal 存储用户信息&#xff0c;而不是采用常见的 Cookie Session。 Cookie Session 由于 HTTP 协议是无状态的&#xff0c;完成操作关闭浏览器后&#xff0c;…

【算法】二分查找(下)

一、山峰数组的峰顶索引 题目链接&#xff1a;852. 山脉数组的峰顶索引 - 力扣&#xff08;LeetCode&#xff09; 题目描述&#xff1a; 给定一个长度为 n 的整数 山脉 数组 arr &#xff0c;其中的值递增到一个 峰值元素 然后递减。 返回峰值元素的下标。 你必须设计并实现时…

【动手学深度学习】#6 卷积神经网络

主要参考学习资料&#xff1a; 《动手学深度学习》阿斯顿张 等 著 【动手学深度学习 PyTorch版】哔哩哔哩跟李牧学AI 由于本系列一开始跳过了第一章引言部分&#xff0c;因此系列编号比书本章节编号提前。现改为和书本统一&#xff08;因为之前自己的原始笔记也是按照书本章节编…

认识一家公司:瑞芯微(Rockchip Electronics Co., Ltd.)以及旗下的两款芯片RK3288\RK3588

瑞芯微&#xff08;Rockchip Electronics Co., Ltd.&#xff09;简介 一、公司概况 瑞芯微电子股份有限公司&#xff08;简称“瑞芯微”&#xff09;成立于2001年&#xff0c;总部位于中国福建省福州市&#xff0c;是一家专注于集成电路设计与研发的高新技术企业。公司采用Fa…

Netty——零拷贝

文章目录 1. 什么是零拷贝&#xff1f;2. 为什么需要零拷贝&#xff1f;2.1 传统 I/O 的拷贝流程2.2 零拷贝的优化2.2.1 通过 sendfile 系统调用2.2.2 通过 mmap (内存映射) 系统调用 3. Netty 实现零拷贝的方式3.1 文件传输优化&#xff1a;FileRegion 封装3.2 直接内存 (Dire…

Java制作简单的聊天室(复习)

设计的知识点&#xff1a;几乎包含java基础的全部知识点&#xff08;java基础语法&#xff0c;java基础进阶&#xff1a;双列集合&#xff0c;io流&#xff0c;多线程&#xff0c;网络编程等&#xff09; 代码如下 客户端&#xff1a; 服务器采用的时多线程的循环多线程的方式…

内核、进程和线程---操作系统

操作系统 操作系统位于用户程序和硬件之间&#xff0c;通过系统调用提供接口可以让应用程序去使用硬件&#xff0c;但是硬件资源的管理和安全控制由操作系统负责。 用户空间和内存空间 在计算机系统中&#xff0c;内存可以分为两大区域&#xff1a;内核空间&#xff08;Ker…

如何在 Postman 中上传图片并在请求中正确引用?

Postman 是一款常用的 API 测试工具&#xff0c;它不仅可以测试 API 的请求和响应&#xff0c;还支持多种数据格式包括图片。如何在 Postman 中传输图片&#xff1f; Postman 如何上传图片并在请求中使用教程

安全+低碳+高效:Acrel-3000助力企业打造未来型电能管理体系-安科瑞黄安南

一 背景 电能因为方便传输、易于转换、便于控制等特性&#xff0c;成为广大企事业单位生产、办公最主要的能量来源。双碳背景下&#xff0c;由于电能清洁、高效、零排放的特点&#xff0c;能源消费侧将逐步以电代煤、以电代油、以电代气&#xff0c;形成以电为中心的能源消费体…

专注自习室:番茄工作法实践

专注自习室&#xff1a;番茄工作法实践 我需要一个任务管理工具&#xff0c;但在网上找了很多都找不到合适的工具。市面上的大多数产品过于强调任务完成性&#xff0c;给我带来了很强的心理压力&#xff0c;这种压力最终反而降低了我的工作效率。于是我决定自己动手&#xff0…

LeetCode算法题(Go语言实现)_16

题目 给定一个二进制数组 nums 和一个整数 k&#xff0c;假设最多可以翻转 k 个 0 &#xff0c;则返回执行操作后 数组中连续 1 的最大个数 。 一、代码实现 func longestOnes(nums []int, k int) int {left, zeroCnt, maxLen : 0, 0, 0for right : 0; right < len(nums); …

CORDIC算法:三角函数的硬件加速革命——从数学原理到FPGA实现的超高效计算方案

计算机该如何求解三角函数&#xff1f;或许你的第一印象是采用泰勒展开&#xff0c;或者采用多项式进行逼近。对于前者&#xff0c;来回的迭代计算开销成本很大&#xff1b;对于后者&#xff0c;多项式式逼近在较窄的范围內比较接近&#xff0c;超过一定范围后&#xff0c;就变…