基于 BlockQueue(阻塞队列) 的 生产者消费者模型

news2025/1/11 18:45:50

文章目录

  • 阻塞队列(BlockQueue)介绍
  • 生产者消费者模型 介绍
  • 代码实现
    • lockGuard.hpp()
    • Task.hpp(任务类)
    • BlockQueue.hpp(阻塞队列)
    • conProd.cc(生产者消费者模型 主进程)
  • 执行结果


阻塞队列(BlockQueue)介绍

阻塞队列(Blocking Queue)是一种特殊类型的队列,它具有阻塞操作的特性。在并发编程中,阻塞队列可以用于实现线程间的安全通信和数据共享。

阻塞队列的 主要特点 是:

  • 队列为空时,消费者线程尝试从队列中获取(出队)元素时会被阻塞,直到有新的元素被添加到队列中为止。
  • 队列已满时,生产者线程尝试向队列中添加(入队)元素时也会被阻塞,直到有空闲容量可用。

阻塞队列通常提供入队操作、出队操作以及获取队列大小等基本方法。

在这里插入图片描述

阻塞队列的实现在下文


生产者消费者模型 介绍

生产者消费者模型 是一种常用的 并发编程模型 ,用于解决多线程或多进程环境下的协作问题。该模型包含两类角色:生产者和消费者

生产者负责生成数据,并将数据存放到共享的缓冲区中。消费者则从缓冲区中获取数据并进行处理。生产者和消费者之间通过共享的缓冲区进行数据交互。

为了确保线程安全,生产者和消费者需要遵循一些规则

  1. 如果缓冲区已满,则生产者需要等待直到有空间可用。
  2. 如果缓冲区为空,则消费者需要等待直到有数据可用。
  3. 生产者和消费者都不能访问缓冲区的内部结构,只能通过特定的接口进行操作。

在这里插入图片描述


代码实现

在代码实现上,生产者消费者模型通常涉及以下几个 角色和操作

  1. 生产者(Producer):负责生成数据并将其放入共享的缓冲区。
  2. 消费者(Consumer):从共享的缓冲区中获取数据并进行处理。
  3. 缓冲区(Buffer):用于暂存生产者生成的数据,供消费者使用。
  4. 同步机制:用于确保生产者和消费者之间的协调和同步,以避免竞态条件和数据不一致性等问题。

我们将要实现的代码中:

阻塞队列 作为缓冲区Task任务类 由生产者生产传入阻塞队列,以便消费者拿去任务消费lockGuard 与条件变量 保证 生产者消费者之间的协调,同步。


lockGuard.hpp()

在 lockGuard.hpp中我们 实现了一个 需封装了互斥锁的Mutex类和一个 实现自动加解锁的lockGuard类

Mutex类封装了pthread_mutex_t类型的互斥锁, lockGuard类是一个RAII风格的加锁方式。

通过这种方式,lockGuard对象的生命周期和锁的生命周期绑定在一起,可以确保在任何情况下都能保证锁的正确释放,避免死锁等问题

完整代码:

#pragma once                                                                           

#include <iostream>
#include <pthread.h>

using std::cout; using std::endl;

// Mutex类封装 pthread_mutex_t 互斥锁
class Mutex
{
public:
    // 构造 
    Mutex(pthread_mutex_t* mtx):_pmtx(mtx){}

    // 调用lock 进行加锁
    void lock()
    {
        cout << "进行加锁" << endl;
        pthread_mutex_lock(_pmtx);
    }

    // 调用unlock 进行解锁
    void unlock()
    {
        cout << "进行解锁" << endl;
        pthread_mutex_unlock(_pmtx);
    }
                                                                                       
    ~Mutex()
    {}

private:
    pthread_mutex_t* _pmtx; 
};


// RAII 风格的加锁方式
// 以实现自动加解锁
class lockGuard
{
public:
    // 构造
    lockGuard(pthread_mutex_t* mtx):_mtx(mtx)
    {
        _mtx.lock();
    }

    // 析构
    ~lockGuard()
    {
        _mtx.unlock();
    }

private:
    Mutex _mtx;                                                                        
};

Task.hpp(任务类)

下面的代码 是一个简化的 任务封装类用于生产者产生任务并将其放入阻塞队列,供消费者取出并执行。通过将函数与参数打包成任务,实现了任务的传递和执行。

#pragma once

#include <iostream>
#include <functional>                                                                  
    
// 表示一个函数类型。    
// func_t是一个接受两个整数参数并返回整数的函数类型    
typedef std::function<int(int, int)> func_t;    

// 任务类型: 用于生产者产生任务
class Task    
{    
public:    
    Task(){};    

    // 传入三个参数x,y,以及一个函数,task则执行func(x, y)    
    Task(int x, int y, func_t func):_x(x),_y(y),_func(func)    
    {}    
    
    // 用于执行任务。在函数体内部,会调用存储在 _func 中的函数对象,
    // 并将 _x 和 _y 作为参数传递给这个函数对象。
    // 最后 返回执行结果。
    int operator()()    
    {    
        return _func(_x,_y);    
    } 
       
public:
    // 用作函数参数
    int _x;
    int _y;
    func_t _func;
};    

BlockQueue.hpp(阻塞队列)

对 阻塞队列 进行类的实现:

BlockQueue包含以下成员变量

std::queue<T> _bq;   // 阻塞队列
int _capacity; // 容量上限
pthread_mutex_t _mtx;   // 互斥锁: 保证队列安全
pthread_cond_t _empty; // 表示bq是否为空
pthread_cond_t _full; // 表示bq是否为满 

以及除构造函数/析构函数外的以下 BlockQueue包含以下成员函数

bool isQueueEmpty() // 判断队列是否为空
bool isQueueFull() // 判断队列是否为满
void push(const T &in) // 生产者用于制造任务
void pop(const T &in) // 消费者用于消耗任务

完整代码:

#pragma once

#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>

#include "lockGuard.hpp"

const int gDefaultCap = 5; // 作为阻塞队列的默认容量

// 阻塞队列
template <class T>
class BlockQueue
{
private:
    // 判断队列是否为空
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }

    // 判满
    bool isQueueFull()
    {
        return _bq.size() == _capacity; // 当size == _capacity 证明队列已满
    }

public:
    // 构造
    BlockQueue(int capacity = gDefaultCap) : _capacity(capacity)
    {
        // 初始化互斥锁 && 条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_empty, nullptr);
        pthread_cond_init(&_full, nullptr);
    }

    // 析构
    ~BlockQueue()
    {
        // 销毁 互斥锁 && 条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

    // 生产者进程
    void push(const T &in)
    {
        // 创建一个lockGuard 变量
        lockGuard lockguard(&_mtx);

        while(isQueueFull())
        {   
            // 如果此时阻塞队列为满,进程等待,直到有空位时改变_full
            pthread_cond_wait(&_full, &_mtx);
        }

        // 此时阻塞队列有空位,正常插入元素,并
        _bq.push(in);
        pthread_cond_signal(&_empty); // 发送信号,表示队列不再为空

        pthread_mutex_unlock(&_mtx);
    }

    // 消费者进程
    void pop(T *out)
    {
        lockGuard lockguard(&_mtx);
        // pthread_mutex_lock(&mtx_);
        while (isQueueEmpty())  // 如果队列为空,等待生产者制造任务
            pthread_cond_wait(&_empty, &_mtx);

        // 此时队列内有任务,
        *out = _bq.front(); // 拿_bq的头部元素,并执行pop(拿任务+销毁)
        _bq.pop();

        pthread_cond_signal(&_full);

        pthread_mutex_unlock(&_mtx);
    }

private:
    std::queue<T> _bq;   // 阻塞队列
    int _capacity; // 容量上限
    pthread_mutex_t _mtx;   // 互斥锁: 保证队列安全
    pthread_cond_t _empty; // 表示bq是否为空
    pthread_cond_t _full; // 表示bq是否为满 
};

conProd.cc(生产者消费者模型 主进程)

该文件中包含以下函数:

  • myAdd 函数:一个简单的加法函数,即实际执行任务所执行的函数

  • consumer 函数消费者线程的执行函数。该函数从阻塞队列中获取任务,并执行任务的函数。

  • productor 函数生产者线程的执行函数。该函数随机生成两个整数参数,创建一个任务对象,并将任务对象插入到阻塞队列中。

  • main 函数主函数,用于创建并启动多个消费者线程和生产者线程。通过调用 pthread_create 创建线程,并通过 pthread_join 等待线程结束。

完整代码:

#include "blockQueue.hpp"                                                           
#include "Task.hpp"    
#include <pthread.h>
#include <unistd.h> 
      
// 加法函数,用于生产者进程产生任务
int myAdd(int x, int y)    
{    
    return x + y;    
}    

// 消费者进程
void *consumer(void* args)    
{  
  // 将获得的agrs 参数 强制转化为BlockQueue<Task>* 类型 并赋值给变量bqueue
  BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;
  
  while(true)
  {
    // 获取任务
    Task t;
    bqueue->pop(&t); // 执行任务 + 销毁
    // 打印任务信息,因为我们使用的仅仅是一个加法函数,所以直接打印"+"
    cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << endl;
  }

  return nullptr;
}

// 生产者进程
void* productor(void* args)
{
  BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;

  while(true)
  {
    // 制造任务
    // 生产者将任务传到缓冲区,消费者再将其消耗
    // 任务不一定有生产者制造,也可能通过外部获得

    // 随机产生x, y两个参数,执行Task
    int x = rand() % 10 + 1;
    usleep(rand() % 1000);  
    int y = rand() % 5 + 1;

    Task t(x, y, myAdd);
    // 发送任务
    bqueue->push(t);
    // 输出消息
    cout << pthread_self() << " productor: " << t._x << " + " << t._y << " = ?" << endl;
    sleep(1);
  }
  return nullptr;
}

int main()
{
  // getpid():获取当前进程的进程ID(PID),用于区分不同的进程。
  // 0x11451 用于增加种子的随机性
  srand((uint64_t)time(nullptr) ^ getpid() ^ 0x11451);

  BlockQueue<Task>* bqueue = new BlockQueue<Task>();

  pthread_t con[2], pro[2]; // 声明两个消费者 / 生产者,增加并行性
  // 可以将 &con[1] 换为 con+1
  pthread_create(&con[0], nullptr, consumer, bqueue);
  pthread_create(&con[1], nullptr, consumer, bqueue);
  pthread_create(&pro[0], nullptr, productor, bqueue);
  pthread_create(&pro[1], nullptr, productor, bqueue);

  // 执行完毕,等待进程销毁
  pthread_join(con[0], nullptr);
  pthread_join(con[1], nullptr);
  pthread_join(pro[0], nullptr);
  pthread_join(pro[1], nullptr);

  delete bqueue; // 销毁队列
  
  return 0;
}

执行结果

在这里插入图片描述

根据上面的执行结果,可以看出,程序先连续生产(即加锁信息的打印),阻塞队列满了后开始消费,后面重复 生产消费(即加锁解锁)的步骤。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/908909.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从来不懂K8s的人10分钟内将应用跑在了K8s中

大家可能都听说过 K8s 或者 docker &#xff0c;可能有容器编排的概念&#xff0c;知道这会提高运维效率&#xff0c;但是由于上手难度高迟迟没有学习它。 今天我以自己的实际经历教大家将自己的应用在10分钟内部署到k8s中&#xff0c;你不需要懂任何的 docker 命令和 k8s 命令…

LinkedList

LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09;LinkedList使用 LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09; 无头双向链表&#xff1a;有两个指针&#xff1b;一个指向前一个节点的地址&#xff1b;一个指向后一个节点的地址。 节点定…

STM32单片机实现Bootloader跳转的关键步骤

感谢关注&#xff01; 本期话题 现在越来越多的嵌入式设备支持远程自动升级&#xff0c;不需要再借助下载器。这样对于设备的维护非常方便。 当然若使设备支持远程升级&#xff0c;需要编写支持升级的程序代码&#xff0c;可以称之为 BootLoader。 也就是说&#xff0c;将设…

【二叉树构建与遍历3】先序遍历+后序遍历构建一个满二叉树并输出中序遍历 C++实现

注意&#xff1a;根据先序遍历与后序遍历只有在满二叉树的情况下才能确定一个唯一的树。这里介绍的是根据先序遍历后序遍历构建一个满二叉树并输出中序遍历顺序。 思路&#xff1a; 先来一个例子&#xff1a; 先序遍历序列为&#xff1a;FDXEABG 后序遍历序列为&#xff1a;…

股票委托接口的部分源码分析(一)

对于一些股票委托接口的源码分析需要具体指定的交易系统可能有不同的接口实现。以下是对一个常见的股票委托接口实现的源码分析示例&#xff1a; import requestsdef place_order(symbol, price, quantity, side): url https://example.com/api/place_order payload {…

gRPC 客户端调用服务端需要连接池吗?

发现的问题 在微服务开发中&#xff0c;gRPC 的应用绝对少不了&#xff0c;一般情况下&#xff0c;内部微服务交互&#xff0c;通常是使用 RPC 进行通信&#xff0c;如果是外部通信的话&#xff0c;会提供 https 接口文档 对于 gRPC 的基本使用可以查看文章 gRPC介绍 对于 g…

ClickHouse(二十三):Java Spark读写ClickHouse API

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术&#xff0c;IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &…

vue开发环境搭建(WebStorm)

一、安装Node.js&#xff0c;搭建Vue环境 1、访问Node.js官网&#xff08;https://nodejs.org/en/download/&#xff09;进行安装包下载。 2、下载成功之后运行安装程序&#xff0c;进行安装。 如果是用安装程序进行安装&#xff0c;在安装过程中会自动进行Nodejs环境变量的配置…

最新两年工作经验总结

最新两年工作经验总结 前言URP的使用1&#xff1a;如何开启URP1、老项目升级为URP2、创建新项目时选择URP创建 2&#xff1a;URP阴影的设置 PolyBrush的使用&#xff08;地图编辑插件&#xff09;制作山峰or低谷边缘柔化雨刷上色制造场景中的物体贴图地形创建容易踩坑的点ProBu…

springboot大文件上传、分片上传、断点续传、秒传的实现

对于大文件的处理&#xff0c;无论是用户端还是服务端&#xff0c;如果一次性进行读取发送、接收都是不可取&#xff0c;很容易导致内存问题。所以对于大文件上传&#xff0c;采用切块分段上传&#xff0c;从上传的效率来看&#xff0c;利用多线程并发上传能够达到最大效率。 …

示例1:FreeRTOS移植详解_基于HAL库工程

1、开发环境 (1)Keil MDK: V5.38.0.0 (2)STM32CubeMX: V6.8.1 (3)MCU: STM32F103C8(F1系列软仿真最方便) (4)ARM编译器&#xff1a;V5(使用V6编译会报错) 2、移植准备工作 (1)用于移植FreeRTOS的基础工程。 时钟已配置好串口已配置好printf已经重定向到串口1 (2)FreeRT…

《YOLO小目标检测》专栏介绍 CSDN独家改进创新实战专栏目录

&#x1f4a1;&#x1f4a1;&#x1f4a1;Yolo小目标检测&#xff0c;独家首发创新&#xff08;原创&#xff09;&#xff0c;适用于Yolov5、Yolov7、Yolov8等各个Yolo系列&#xff0c;专栏文章提供每一步步骤和源码&#xff0c;带你轻松实现小目标检测涨点 &#x1f4a1;&…

【二分查找篇】速刷牛客TOP101 高效刷题指南

文章目录 17、BM17 二分查找-I18、BM18 二维数组中的查找19、BM19 寻找峰值20、BM20 数组中的逆序对21、BM21 旋转数组的最小数字22、BM22 比较版本号23、BM23 二叉树的前序遍历 17、BM17 二分查找-I 思路步骤&#xff1a; step 1&#xff1a;从数组首尾开始&#xff0c;每次取…

wustojc日期格式变化

#include <stdio.h> int main() {char a[10];for(int i0;i<10;i){//用一个耍聪明的方法&#xff0c;全部用数组存储&#xff1b;面向结果编程a[0]getchar();}printf("%c%c%c%c%c%c%c%c%c%c",a[6],a[7],a[8],a[9],a[2],a[0],a[1],a[5],a[3],a[4]);return 0;}…

什么是跳跃表 ? 说一说跳跃表的查询和新增流程 ?

1.什么是跳跃表&#xff08;Skip List&#xff09; 跳跃表是 ZSet 有序列表底层的一种实现&#xff0c;也成为跳表。它通过添加多层链表的方式&#xff0c;用于在有序集合中进行高效的查找操作。 简单跳跃表的结构图&#xff1a; 从图中可以看出跳跃表有这些特征&#xff1a; …

Nginx-URLRewrite伪静态

URLRwrite是指将真实地址隐藏&#xff0c;用户访问是通过伪地址进行访问&#xff0c;这样可以隐藏URL中的传参等等 URLwrite演示&#xff0c;浏览器输入伪URL&#xff0c;回车会跳转到真实URL Rewrite匹配规则 redirect是指当请求伪装地址后&#xff0c;页面会直接跳转到真实…

基于微信小程序的上门维修评价系统_22c7h-

随着科学研究的不断深入&#xff0c;有关上门维修的各种信息量也在成倍增长。面对庞大的信息量&#xff0c;就需要有上门维修系统来提高管理工作的效率。通过这样的系统&#xff0c;我们可以做到信息的规范管理和快速查询&#xff0c;从而减少了管理方面的工作量。 建立基于微信…

聊聊 Docker

聊聊 Docker Docker 是什么&#xff1f; 定义 Docker 是一款 开源的应用容器引擎。 简单来说&#xff0c;就是 以容器虚拟化技术为基础的软件。可以把应用程序和所依赖的包一起打包到一个可移植的镜像中&#xff0c;发布到 Linux 或者 Windows 上运行。&#xff08;代码 运…

数据通信——传输层(传输层概述)

引言 终于到传输层了&#xff0c;网络层还有很多需要补充的&#xff0c;后期在慢慢填补了。 我们看哈&#xff01;在物理层我们设计出来各种硬件&#xff0c;然后使它们在物理上相互连接&#xff0c;信号以比特流的形式进行发送&#xff1b;随后&#xff0c;在数据链路层&#…

Mybatis介绍和搭建(详细搭建步骤)

目录 一、mybatis介绍 官方简介 通俗易懂 二、搭建步骤 1.创建Maven项目 2.创建数据库并建表和相关类 3.创建全局配置文件,配置数据库连接信息 4.配置sql映射文件 5.测试 一、mybatis介绍 官方简介 MyBatis 是一款优秀的持久层框架&#xff0c;它支持自定义 SQL、存…