Linux——生产者消费者模型

news2025/1/22 18:53:17

目录

一.为何要使用生产者消费者模型

 二.生产者消费者模型优点

 三.基于BlockingQueue的生产者消费者模型

1.BlockingQueue——阻塞队列

2.实现代码

 四.POSIX信号量

五.基于环形队列的生产消费模型


一.为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

 

 二.生产者消费者模型优点

  • 解耦:生产者和消费者不直接解除,无需关心对方的情况,仅仅自己与缓冲区解除。
  • 支持并发:并发的体现并不在于多个消费者同时从缓冲区中拿数据,也不是多个生产者同时从缓冲区放数据,而是消费者在处理拿到的数据的时候,生产者可以继续向缓冲区放数据。
  • 支持忙闲不均 :当生产者生产过快的时候,可以让生产者慢下来,当消费者消费过快的时候,可以让消费者慢下来。

 三.基于BlockingQueue的生产者消费者模型

 1.BlockingQueue——阻塞队列

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

2.实现代码

#include <iostream>
#include <string>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <pthread.h>

using namespace std;

template <class T>
class BlockQueue
{
public:
    BlockQueue(size_t cap)
        : _cap(cap)
    {
        // 初始化条件变量
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }

    void push(T date)
    {
        // 将任务push进去队列,多线程加锁,每一只能一个线程push任务
        pthread_mutex_lock(&_mutex);
        while (_q.size() == _cap) // 如果队列已经满了,生产者要被阻塞
        {
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        _q.push(date);
        // 当push任务成功的时候,需要将唤醒消费者来处理数据
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

    T pop()
    {
        // 将任务从队列中拿出来,多线程加锁,每一只能一个线程拿任务
        pthread_mutex_lock(&_mutex);
        // 如果队列是空的就将消费者阻塞
        while (isempty())
        {
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        T tmp = _q.front();
        _q.pop();
        // 成功拿到数据以后,唤醒生产者继续生产任务
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);

        return tmp;
    }

    ~BlockQueue()
    {
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }

private:
    bool isempty()
    {
        return _q.empty();
    }
    bool isfull()
    {
        return _q.size() == _cap;
    }

private:
    queue<T> _q; // 队列
    size_t _cap; // 容量

    pthread_cond_t _c_cond;                             // 消费者条件变量
    pthread_cond_t _p_cond;                             // 生产者条件变量
    pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
};

cp模型:

#include "BlockQueue.hpp"

using namespace std;

// 构建任务
struct Task
{
    Task(int a, int b, char op)
        : _a(a), _b(b), _op(op)
    {
    }

    char _op;      // 运算符
    int _a;        // 运算数1
    int _b;        // 运算数2
    int ret;       // 结果
    int _exitcode; // 退出码
};

void *push_task(void *args)
{
    BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);
    while (1)
    {
        char op_arr[4] = {'+', '-', '*', '/'};
        int a = rand() % 10;
        int b = rand() % 10;
        char op = op_arr[(a * b) % 4];
        // 构建任务结构体
        Task tk(a, b, op);
        // push任务
        pBQ->push(tk);
        printf("%d %c %d =?\n", a, op, b);
        sleep(1);
    }

    return NULL;
}

void *get_task(void *args)
{
    BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);

    while (1)
    {
        // 获取任务并处理
        Task tk = pBQ->pop();
        switch (tk._op)
        {
        case '+':
            tk.ret = tk._a + tk._b;
            break;
        case '-':
            tk.ret = tk._a - tk._b;
            break;
        case '*':
            tk.ret = tk._a * tk._b;
            break;
        case '/':
            if (tk._b == 0)
            {
                exit(-1);
            }
            tk.ret = tk._a / tk._b;
            break;
        default:
            break;
        }
        printf("%d %c %d = %d\n", tk._a, tk._op, tk._b, tk.ret);
        sleep(1);
    }

    return NULL;
}

int main()
{
    BlockQueue<Task> BQ(5);
    pthread_t tid_c[4];
    pthread_t tid_p[4];
    srand(time(nullptr));
    // push
    pthread_create(&tid_c[0], NULL, push_task, &BQ);
    pthread_create(&tid_c[1], NULL, push_task, &BQ);
    pthread_create(&tid_c[2], NULL, push_task, &BQ);
    pthread_create(&tid_c[3], NULL, push_task, &BQ);
    // get
    pthread_create(&tid_p[0], NULL, get_task, &BQ);
    pthread_create(&tid_p[1], NULL, get_task, &BQ);
    pthread_create(&tid_p[2], NULL, get_task, &BQ);
    pthread_create(&tid_p[3], NULL, get_task, &BQ);

    pthread_join(tid_c[0], NULL);
    pthread_join(tid_c[1], NULL);
    pthread_join(tid_c[2], NULL);
    pthread_join(tid_c[3], NULL);
    pthread_join(tid_p[0], NULL);
    pthread_join(tid_p[1], NULL);
    pthread_join(tid_p[2], NULL);
    pthread_join(tid_p[3], NULL);

    return 0;
}

测试结果:

 四.POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

定义信号量:

sem_t sem;

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  1. pshared:0表示线程间共享,非零表示进程间共享。
  2. value:信号量初始值。

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1。

int sem_wait(sem_t *sem); //P()

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

int sem_post(sem_t *sem);//V()

说明:

  • 信号量本身就是一个计数器,用来描述可用资源的数目。
  • 信号量机制就像是我们看电影买票一样,是对资源的预定机制。
  • 只有申请到信号量才能对共享资源访问。
  • 只要我们申请信号量成功了,将来我们一定可以访问临界资源,就像看电影,我们只要买到了电影票,不管我们去不去电影院,电影院里一定有我们的位置。

五.基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性。

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

代码:

RingQueue.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include "mutex.hpp"
#include "Task.hpp"
using namespace std;

const size_t size = 5;

template <class T>
class RingQueue
{
    void P(sem_t &sem) // 申请信号量
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem) // 释放信号量
    {
        sem_post(&sem);
    }

public:
    RingQueue(int cap = size)
        : _cap(cap), _index_space(0), _index_date(0)
    {
        // 初始化信号量
        sem_init(&_sem_date, 0, 0);    // 数据信号量初始化为0
        sem_init(&_sem_space, 0, cap); // 空间信号量初始化为容量大小
        // 初始化锁
        pthread_mutex_init(&_mutex, nullptr);
        _rq.resize(_cap);
    }

    void push(const T date)
    {
        // 申请空间信号量
        P(_sem_space);
        {
            MutexGuard lock(&_mutex);
            _rq[_index_date++] = date;
            _index_date %= _cap;
        }
        // 释放数据信号量
        V(_sem_date);
    }

    T pop()
    {
        // 申请数据信号量
        P(_sem_date);
        T tmp;
        {
            MutexGuard lock(&_mutex);
            tmp = _rq[_index_space++];
            _index_space %= _cap;
        }
        // 释放空间信号量
        V(_sem_space);
        return tmp;
    }

    ~RingQueue()
    {
        // 释放信号量和互斥锁
        sem_destroy(&_sem_date);
        sem_destroy(&_sem_space);
        pthread_mutex_destroy(&_mutex);
    }

private:
    vector<T> _rq;
    size_t _cap; // 容量

    sem_t _sem_space; // 记录环形队列的空间信号量
    sem_t _sem_date;  // 记录环形队列的数据信号量

    size_t _index_space; // 生产者的生产位置
    size_t _index_date;  // 消费者的消费位置

    pthread_mutex_t _mutex; // 容量
};

mutex.hpp:

class Mutex
{
public:
    Mutex(pthread_mutex_t *mutex)
        : _mutex(mutex)
    {
    }

    void lock()
    {
        pthread_mutex_lock(_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_mutex);
    }
    ~Mutex()
    {
    }

private:
    pthread_mutex_t *_mutex;
};

class MutexGuard
{
public:
    MutexGuard(pthread_mutex_t *mutex)
        : _mutex(mutex)
    {
        _mutex.lock();
    }
    ~MutexGuard()
    {
        _mutex.unlock();
    }

private:
    Mutex _mutex;
};

Task.hpp:

#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>

struct Task
{
    Task(int a = 1, int b = 1, char op = '+')
        : _a(a), _b(b), _op(op)
    {
    }

    void run()
    {
        switch (_op)
        {
        case '+':
            _ret = _a + _b;
            break;
        case '-':
            _ret = _a - _b;
            break;
        case '*':
            _ret = _a * _b;
            break;
        case '/':
            if (_b == 0)
            {
                _exitcode = -1;
                exit(1);
            }
            _ret = _a / _b;
            break;
        default:
            break;
        }
    }

    void showtask()
    {
        printf("producer:%d %c %d = ?\n", _a, _op, _b);
    }

    void showresult()
    {
        printf("consumer:%d %c %d = %d(exitcode:%d)\n", _a, _op, _b, _ret, _exitcode);
    }

    ~Task() {}

private:
    int _a;
    int _b;
    char _op;
    int _ret;

    int _exitcode = 0;
};

pthread.cc:

#include "RingQueue.hpp"

void *run_p(void *args)
{
    char ops[4] = {'+', '-', '*', '/'};
    RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);
    while (1)
    {
        int a = rand() % 100;
        int b = rand() % 100;
        int op = ops[(a * b) % 4];
        Task tk(a, b, op);

        RQ->push(tk);
        tk.showtask();
        sleep(1);
    }
}
void *run_c(void *args)
{
    RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);
    while (1)
    {
        Task tk = RQ->pop();
        tk.run();
        tk.showresult();
        sleep(1);
    }
}

int main()
{
    RingQueue<Task> *RQ = new RingQueue<Task>(5);
    srand(time(0));
    pthread_t tid_c[5];
    pthread_t tid_p[5];

    for (int i = 0; i < 5; i++)
    {
        pthread_create(&tid_c[i], nullptr, run_c, RQ);
        pthread_create(&tid_p[i], nullptr, run_p, RQ);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(tid_c[i], nullptr);
        pthread_join(tid_p[i], nullptr);
    }

    delete RQ;

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1102169.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面试官:做过性能优化?我:任务切片!

给大家推荐一个实用面试题库 1、前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;web前端面试题库 代码背景 本次分享基于一次线上环境的卡顿事故&#xff0c;客户数据体量过大导致的页面卡顿甚至页面直接崩溃的问题…

pdf压缩文件怎么压缩最小?

pdf压缩文件怎么压缩最小&#xff1f;我们很多项目介绍或是学术的报告都是采用的这个pdf格式&#xff0c;那么我们在存储或是需要进行分享的时候&#xff0c;可能就会因为文件过大而导致无法打开或是发送了。那么就需要将其进行压缩。PDF文件压缩方法很多&#xff0c;pdf压缩文…

【前端】JS - WebAPI

目 录 一.WebAPI 背景知识什么是 WebAPI什么是 APIAPI 参考文档 二.DOM 基本概念什么是 DOMDOM 树 三.获取元素querySelectorquerySelectorAll 四.事件初识基本概念事件三要素 五.操作元素获取/修改元素内容&#xff08;innerHTML&#xff09;获取/修改元素属性获取/修改样式属…

为什么全链路压测如此重要?

在今天的数字化世界中&#xff0c;软件系统的稳健性和性能至关重要。用户对于应用程序的高可用性和快速响应时间有着越来越高的期望&#xff0c;因此&#xff0c;全链路压测变得至关重要。本文将深入探讨什么是全链路压测&#xff0c;为什么它如此重要以及如何进行全链路压测。…

【ESP32】C语言映射表在嵌入式串口解析中的应用

本文章主要以ESP32开发环境为例记录&#xff0c;C语言映射表在嵌入式串口解析中的应用 【ESP32】C语言映射表在嵌入式串口解析中的应用 一、C语言映射表在串口数据解析中的应用1、数据结构2、指令、函数映射表3、串口解析函数实现 二、实验现象三、实验代码 一、C语言映射表在串…

Python超入门(4)__迅速上手操作掌握Python

# 15.while循环 rows 0 while rows < 5:print(* * rows)rows 1* ** *** ****# 16.使用while循环制作猜灯谜游戏secret_num 12 guess_count 0 guess_limit 100while guess_count < guess_limit:guess_count int(input("猜测:"))if guess_count secret_n…

turn搭建测试

安装 安装环境 cat /etc/redhat-release CentOS Linux release 7.6.1810 (Core)相关系统环境安装 sudo yum install -y make gcc cc gcc-c wgetsudo yum install -y openssl-devel libevent libevent-devel安装libEvent组件 # 如果链接不可用直接去官网找对应release包下载…

面试算法27:回文链表

问题 如何判断一个链表是不是回文&#xff1f;要求解法的时间复杂度是O&#xff08;n&#xff09;&#xff0c;并且不得使用超过O&#xff08;1&#xff09;的辅助空间。如果一个链表是回文&#xff0c;那么链表的节点序列从前往后看和从后往前看是相同的。 分析 如果不考虑…

使用两个goroutine交替、顺序打印一段字符串的字符

1、使用两个goroutine交替、顺序打印一段字符串的字符 输入&#xff1a;hello world 输出&#xff1a;hello world 关键点&#xff1a;控制goroutine的执行先后循序 golang语言版本&#xff1a; package mainimport ("fmt""sync" )func main() {conte…

数据结构——线性表作业

目录 选择题和填空题 编程题 1. 输出单链表倒数第K个结点值 单链表 双指针 2. 数组元素移动 3. 多项式相加 4. 数组的循环左移 选择题和填空题 编程题 1. 输出单链表倒数第K个结点值 【问题描述】 输入一个单向链表&#xff0c;输出该链表中倒数第k个结点&#xff0…

外贸知识:谈好订单的客户临时却要求价格优惠怎么办?

距离订单成交只差一步的时候&#xff0c;客户却要求价格一降再降&#xff0c;是为什么&#xff1f; 真的只是为了降价吗&#xff1f; Carl是一个苦苦挣扎的业务员&#xff0c;每次在挖掘客户需求的过程中&#xff0c;客户都是一副“对对对&#xff0c;是是是&#xff0c;好好好…

那些年,我们追过的Java BUG

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

基于springboot实现银行OA系统的设计与实现平台项目【项目源码+论文说明】计算机毕业设计

摘要 在互联网信息技术时代中&#xff0c;企业管理更多的是使用管理系统进行智能化控制&#xff0c;提高单位的核心竞争力&#xff0c;适应快节奏的生产活动。银行OA系统是为企业提供的一整套便于企业管理的应用软件&#xff0c;是目前企业管理的必备系统。通过走访了解&#…

几个非常实用的 Chrome Devtools 技巧

原文链接&#xff1a;[几个非常实用的 Chrome Devtools 技巧](https://fe32.top/articles/skill001/ 关于 Chrome 浏览器&#xff0c;如果你是一名前端开发者&#xff0c;相信对此并不陌生&#xff0c;我们可以用它来查看 网络请求、分析网页性能、调试 JavaScript 功能 等。 …

山海鲸报表系统:数据洞察的利器

在信息时代&#xff0c;数据是每个组织的核心资产。然而&#xff0c;拥有大量数据并不足够&#xff0c;我们需要将这些数据转化为有用的信息&#xff0c;以指导决策和行动。这就是山海鲸报表系统的使命。 解析数据的力量 山海鲸报表系统是一款强大的工具&#xff0c;旨在帮助企…

代码随想录打卡第四十二天| ● 62.不同路径 ● 63. 不同路径 II

62 不同路径 **题目&#xff1a;**一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff09;。 问总共有多少条…

收银系统哪个好 2023年十大收银软件品牌排行榜

根据2023年的最新数据和市场调研&#xff0c;以下是收银系统十大排名&#xff08;2023年十大收银软件品牌排行榜&#xff09;&#xff1a; 商人宝 一款集成了收银系统、会员管理、进销存和数据分析等多个功能的服装店收银软件&#xff0c;适用于服装卖场、中小型连锁超市、便利…

RunnerGo UI自动化使用体验

RunnerGo怎么做UI自动化 首先需要进入官网&#xff0c;RunnerGo支持开源&#xff0c;可以自行下载安装&#xff0c;也可以点击右上角体验企业版按钮快速体验 点击体验企业版进入工作台后可以点击页面上方的UI自动化 进入到测试页面 创建元素 我们可以在元素管理中创建我们测试…

企业文件防泄密软件!好用的文件加密系统推荐

由于众多企业内部都有大量的机密数据以电子文档的形式存储着&#xff0c;且传播手段多样&#xff0c;很容易造成文件泄密的问题发生。若是员工通过网络泄密重要文件&#xff0c;或是有黑客入侵窃取机密数据等&#xff0c;造成重要文件被非法查看盗取&#xff0c;都会给企业业务…

ROS 物体跟踪示例

物体跟踪与物体识别有相似之处&#xff0c;同样使用特征点检测的方法&#xff0c;但侧重点并不相同。物体识别针对的物体可以是静态的或动态的&#xff0c;根据物体特征点建立的模型作为识别的数据依据&#xff1b;物体跟踪更强调对物体位置的准确定位&#xff0c;输入图像一般…