构建基于 阻塞队列 / 环形队列 的高效生产消费者模型系统

news2024/10/12 12:10:40
1. 生产者-消费者问题 概述

生产-消费者模型 :一个或多个 生产者线程 产生数据并将其放入共享缓冲区,同时一个或多个 消费者线程 从该缓冲区中读取数据进行操作的情景。

缓冲区 是一个用于存储生产者产生数据的中间容器;缓冲区 的容量通常是有限的,只能容纳一定数量的数据项。

生产 - 消费者模型的核心问题: 1. 线程与线程之间的 同步互斥 关系;生产者与消费者之间的 等待唤醒 机制。

2. 阻塞队列

在这里插入图片描述

2.1 工作原理
  • 队列为空时,消费者从队列中消费数据时,会被阻塞挂起,直至队列中有元素可用;

  • 队列为满时,生产者向队列中生产数据时,会被阻塞挂起,直至队列中有空闲位置;

  • 使用 条件变量(pthread_cond_t 类型)+ 锁(pthread_mutex_t 类型)实现 等待 和 唤醒 机制。

2.2 框架
#include <iostream>
#include <queue>
#include <pthread.h>

template<class T>
class BlockQueue
{
public:
	BlockQueue(int cap) :_cap(cap), _producer_wait_num(0), _consumer_wait_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        
        pthread_cond_init(&_producer_cond, nullptr);
        pthread_cond_init(&_consumer_cond, nullptr);
    }
    
	~BlockQueue() 
    {
        pthread_mutex_destroy(&_mutex);
        
        pthread_cond_destroy(&_producer_cond);
        pthread_cond_destroy(&_consumer_cond);
    }    
private:
    queue<T> _block_queue; // 生产者生产任务,消费者处理任务
	int _cap; // 容量
    
    pthread_mutex_t _mutex; // 保护共享资源的互斥锁
    
    pthread_cond_t _producer_cond; // 用于唤醒生产者生产
    pthread_cond_t _consumer_cond; // 用于唤醒消费者消费
    
    int _producer_wait_num; // 记录阻塞挂起的生产者数量
    int _consumer_wait_num; // 记录阻塞挂起的消费者数量
};

pthread_mutex_t _mutex; // 保护共享资源的互斥锁

解释 “为什么消费者和生产者共用一把锁” :

  1. 阻塞队列是一个共享资源,生产者和消费者都需要访问同一段内存资源来插入和删除元素;为了防止数据竞争和不一致,必须保证同一时间只有一个线程可以访问队列

  2. 使用单个锁可以确保生产者和消费者访问队列时不会发生冲突。

2.3 生产消费
  • 生产
template<class T>
class BlockQueue
{
public:
    void Enqueue(T& in)
    {
        phtread_mutex_lock(&_mutex); // 加锁
        while (_block_queue.size() == _cap) // 阻塞队列为满
        {
            ++_producer_wait_num;
            pthread_cond_signal(&_consumer_cond); // 唤醒消费者线程
            pthread_cond_wait(&_producer_cond, &_mutex); // 阻塞挂起, 释放锁
            --_producer_wait_num;
        }
        
        _block_queue.push(in);
        if (_consumer_wait_num > 0) // 唤醒 阻塞挂起的消费者线程
            pthread_cond_signal(&_consumer_cond); 
        pthread_mutex_unlock(&_mutex); // 解锁
    }
};
  • 消费
template<class T>
class BlockQueue
{
public:
    void Pop(T* out)
    {
        pthread_mutex_lock(&mutex);
        while (_block_queue.empty())
        {
            ++_consumer_wait;
            pthread_cond_signal(&_producer_cond); // 唤醒生产者进行生产
            phread_cond_wait(&consumer_cond, &mutex);
            --_consumer_wait;
		}
        
        *out = _block_queue.front();
        _block_queue.pop();
        if (_producer_wait_num > 0)
            	pthread_cond_signal(&_producer_cond);
        pthread_mutex_unlock(&mutex);
	}
};
3. 环形队列 + 信号量

3.1 信号量

信号量(semaphore) 是一种用于控制多个线程对共享资源的访问的同步机制。

举个形象的例子:

如果把阻塞队列比作电影院,那么信号量就能把电影院的座位划分为可用座位(空位)和已占用座位(已填位),供线程同步使用。

进一步解释:

阻塞队列确保同一时间只有一个线程可以访问电影院的入口门(即队列的互斥访问);信号量的出现,使得不同线程可以访问不同的座位(即队列的不同位置),从而实现高效的同步和资源管理。

// 头文件
#include <semaphore.h>

// 信号量类型: sem_t

int sem_init(sem_t* sem, int pshared, unsigned int value);
// sem: 指向要初始化的信号量的指针;
// pshared: 指定信号量是否可以在多个进程间共享;设置为 0,信号量仅在当前进程(各个线程之间)中共享;
// value: 初始化信号量的值,这个值表示信号量的初始计数值,用于控制资源的数量。

int sem_destroy(sem_t* sem);

int sem_wait(sem_t* sem); // P 操作
// 调用 sem_wait 时,如果信号量的值大于 0,函数会立即将信号量的值-1 并返回;
// 如果信号量的值为 0,调用该函数的线程会被阻塞挂起,直至信号量的值大于 0。

int sem_post(sem_t* sem); // V 操作
// 调用 sem_post 时,信号量的值会 +1;
// 如果此前有线程因调用 sem_wait 而被阻塞,那么其中一个被阻塞的线程会被唤醒并继续执行。

// 返回值:success -> 0 ; fail -> -1, 并设置相应的错误码

引入信号量,可以让环形队列中同时存在两把互斥锁,用于处理多生产多消费的场景,而不用担心生产者与消费者线程之间出现数据竞争和不一致的情况。

定义信号量: _room —— 生产者申请空间资源_data —— 消费者申请数据资源

初始化: sem_init(&_room, 0, _cap); sem_init(&_data, 0, 0 );

不考虑环形队列中空出一个位置作为分隔,当 生产者 和 消费者 指向同一个位置时,只可能是以下两种情况:

  1. 队列为空。

    消费者无法申请到当前位置的数据资源,只能被阻塞挂起;

    生产者可以在当前位置申请到空间资源。

  2. 队列为满。

    生产者无法申请到当前位置的空间资源,只能被阻塞挂起;

    消费者可以在当前位置申请到数据资源。

3.2 框架
template <class T>
class RingQueue
{
public:
    RingQueue(int cap) :_cap(cap), _ring_queue(cap), _producer_pos(0), _consumer_pos(0)
    {
        pthread_mutex_init(&_producer_mutex, nullptr);
	    pthread_mutex_init(&_producer_mutex, nullptr);
        
        sem_init(&_producer_sem, 0, cap);
        sem_init(&_consumer_sem, 0, 0);
    }
    
    ~RingQueue()
    {
        sem_destroy(&_producer_sem);
        sem_destroy(&_consumer_sem);
        
        pthread_mutex_destroy(&_producer_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
	}
private:
    vector<T> _ring_queue;
    int _cap;
    
    int _producer_pos;
    int _consumer_pos;
    
    sem_t _producer_sem;
    sem_t _consumer_sem;
    
    pthread_mutex_t _producer_mutex;
    pthread_mutex_t _consumer_mutex;
};
3.3 生产 与 消费
template <class T>
class RingQueue
{
private:
    int P(sem_t& sem)
    {
        return sem_wait(&sem);
    }
    
    int V(sem_t& sem)
    {
        return sem_post(&sem);
    }
public:
	void Enqueue(T& in)
    {
        P(_producer_sem); // 先申请资源,再申请锁
        // 运行到这里,就一定申请到了合法资源!!
        pthread_mutex_lock(&_producer_mutex);
        
        _ring_queue[_producer_pos++] = in;
        _producer_pos %= _cap;
        
        pthread_mutex_unlock(&_producer_mutex);
        V(_consumer_sem);
    }
    
    void Pop(T* out)
    {
        p(_consumer_sem);
        pthread_mutex_lock(&_consumer_mutex);
        
        *out = _ring_queue[_consumer_pos++];
        _consumer_pos %= _cap;
        
        pthread_mutex_unlock(&_consumer_mutex);
        V(_producer_sem);
    }
};
4. 使用阻塞队列 / 环形队列构建生产消费者模型
4.1 生产-消费者模型概述

生产者-消费者模型是一类多线程编程问题,涉及一次性创建多个生产者线程和消费者线程,并将其高效组织起来;

生产者线程负责生产任务并将其放入共享队列中,消费者线程负责将任务取出并执行;

阻塞队列 / 环形队列 作为底层容器,用于安全地存储和传输这些任务,确保生产者与消费者之间的同步和互斥。

生产-消费者模型的宏观逻辑如下:

  1. 创建阻塞队列 / 环形队列
  2. 创建生产者线程
  3. 创建消费者线程
  4. 启动所有线程
  5. 等待线程终止
template<class T>
using blockqueue_t = blockqueue<T>;

int main()
{
    blockqueue_t<int> *bq = new blockqueue_t<int>(5);
    vector<Thread<blockqueue_t<int>>> threads; // Thread 为封装的类

    Producer(threads, *bq, 1);
    Consumer(threads, *bq, 1);

    StartAll(threads);
    WaitAll(threads);

    return 0;
}

其中,最重要的是确保所有生产者线程和消费者线程都能访问同一份共享资源

以创建生产者线程为示例,展示如何确保所有生产者线程能够访问同一份共享资源。

4.2 创建生产者线程
  • Producer(threads, *bq, 1); // main() 创建生产者线程

Producer 函数的形参中,要通过引用传递 threadsbq ,确保所有生产者线程可以访问同一个共享队列 。

void ProducerCore(blockqueue_t<int>& bq)
{
    int cnt = 10;
    while (true)
    {
        bq.Enqueue(cnt);
        cout << "Producer" << " send a task : " << cnt-- << endl;
        sleep(1);
    }
}

void Producer(vector<Thread<blockqueue_t<int>>> &threads, blockqueue_t<int> &bq, int threadnum)
{
    for (int num = 0; num < threadnum; num++)
    {
        string name = "producer-" + to_string(num + 1);
       
        threads.emplace_back(bq, ProducerCore, name);
    }
}
4.3 完整代码
#include "Thread.hpp" // Thread.hpp 为封装的线程类型
using namespace MyThread;
#include <iostream>
using namespace std;
#include <vector>
#include <string>
#include <unistd.h>
#include "BlockQueue.hpp"

template<class T>
using blockqueue_t = blockqueue<T>;

void ProducerCore(blockqueue_t<int>& bq)
{
    int cnt = 10;
    while (true)
    {
        bq.Enqueue(cnt);
        cout << "Producer" << " send a task : " << cnt-- << endl;
        sleep(1);
    }
}

void ConsumerCore(blockqueue_t<int>& bq)
{
    sleep(3);
    while (true)
    {
        int data = 0;
        bq.Pop(&data);
        cout << "Consumer" << " execute the task : " << data << endl;
        sleep(1);
    }
}

void Producer(vector<Thread<blockqueue_t<int>>> &threads, blockqueue_t<int> &bq, int threadnum)
{
    for (int num = 0; num < threadnum; num++)
    {
        string name = "producer-" + to_string(num + 1);
        threads.emplace_back(bq, ProducerCore, name);
    }
}

void Consumer(vector<Thread<blockqueue_t<int>>> &threads, blockqueue_t<int> &bq, int threadnum)
{
    for (int num = 0; num < threadnum; num++)
    {
        string name = "consumer-" + to_string(num + 1);
        threads.emplace_back(bq, ConsumerCore, name);
    }
}

void StartAll(vector<Thread<blockqueue_t<int>>> &threads)
{
    for (auto &thread : threads)
    {
        thread.Start();
    }
}

void WaitAll(vector<Thread<blockqueue_t<int>>> &threads)
{
    for (auto &thread : threads)
    {
        thread.Join();
    }
}

int main()
{
    blockqueue_t<int> *bq = new blockqueue_t<int>(5);
    vector<Thread<blockqueue_t<int>>> threads;

    Producer(threads, *bq, 1);
    Consumer(threads, *bq, 1);

    StartAll(threads);
    WaitAll(threads);

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2207885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【操作系统】四、文件管理:1.文件系统基础(文件属性、文件逻辑结构、文件物理结构、文件存储管理、文件目录、基本操作、文件共享、文件保护)

文件管理 文章目录 文件管理八、文件系统基础1.文件的属性2.文件的逻辑结构2.1顺序文件2.2索引文件2.3索引顺序文件2.4多级索引顺序文件 3.目录文件❗3.1文件控制块FCB3.1.1对目录进行的操作 3.2目录结构3.2.1单级目录结构3.2.2两级目录结构3.2.3多级目录结构&#xff08;树形目…

vue2引入i18n插件实现中英文切换

vue2引入i18n插件实现中英文切换 1.安装i18n插件2.引入3.使用4.数据渲染 1.安装i18n插件 npm install vue-i18n --save-dev注意&#xff1a; vue2环境下安装i18n插件时 有可能会报错&#xff08;我的这个项目比较老&#xff0c;vue2.5.x版本的&#xff09;&#xff0c;报错信息…

保姆级教程 | Linux中grep命令使用 分子动力学轨迹文件输出特定原子电荷值

背景 由于课题需要&#xff0c;现根据lammps运行得到的轨迹需要提取出目标原子的电荷值 步骤 思路 首先确定目标原子在轨迹中的序号&#xff08;lammps每个原子都有自己独立的【分子号原子号】&#xff09; 其次要十分清楚体系中的分子号排序方式&#xff0c;然后只要筛选出…

安卓13禁止锁屏 关闭锁屏 android13禁止锁屏 关闭锁屏

总纲 android13 rom 开发总纲说明 文章目录 1.前言2.问题分析3.代码分析4.代码修改5.彩蛋1.前言 设置 =》安全 =》屏幕锁定 =》 无。 我们通过修改系统屏幕锁定配置,来达到设置屏幕不锁屏的配置。像网上好多文章都只写了在哪里改,改什么东西,但是实际上并未写明为什么要改那…

浅谈虚拟电厂在分布式光伏发电应用示范区中的应用及前景

0引言 随着电力体制改革的持续推进&#xff0c;电力市场将逐步建立和完善&#xff0c;未来的售电主体也将随着配售电业务的逐步放开而日益多元化&#xff0c;新的政策不断鼓励分布式电源和微电网作为独立的配售电市场主体推动运营模式的创新。与微电网所采取的就地应用为控制目…

离散数学-逻辑与证明基础1.4(谓词和量词)

谓词 1.4.2 谓词 涉及变量的语句&#xff0c;例如&#xff1a; “ x > 3 x > 3 x>3”&#xff0c;“ x y 3 x y 3 xy3”&#xff0c;“ x y z x y z xyz” 以及 \quad “Computer x x x is under attack by an intruder” \quad “Computer x x x is f…

nginx虚拟主机配置与locaion规则

目录 1.虚拟主机 1.1分类 1.2基于域名的虚拟机 1.2.1测试 1.3基于端口的虚拟主机 1.3.1测试 ​编辑1.4基于IP的虚拟主机 2.nginx日志 3.location 1.虚拟主机 虚拟主机:相当于1个网站&#xff0c;在nginx中通过server{}区域实现。 nginx虚拟主机有不同的配置类型…

科研论文必备:10大平台和工具助你高效查找AI文献

申博、留学、评职称的同学&#xff0c;逃不过要发表论文。对很多人尤其是对于论文新手来说&#xff0c;写论文可能是一个极具挑战性的过程。今天Bulu分享以下10个论文平台、论文检索工具&#xff0c;会大大提高论文撰写效率&#xff0c;告别熬夜肝论文&#xff01;建议收藏哦&a…

【原创】java+springboot+mysql劳动教育网系统设计与实现

个人主页&#xff1a;程序猿小小杨 个人简介&#xff1a;从事开发多年&#xff0c;Java、Php、Python、前端开发均有涉猎 博客内容&#xff1a;Java项目实战、项目演示、技术分享 文末有作者名片&#xff0c;希望和大家一起共同进步&#xff0c;你只管努力&#xff0c;剩下的交…

78.【C语言】EOF的解释

1.cplusplus网的介绍 在这几篇文章提到过,但没有详细阐释过EOF的细节 24.【C语言】getchar putchar的使用E4.【C语言】练习&#xff1a;while和getchar的理解32.【C语言】详解scanf 75.【C语言】文件操作(3) cplusplus网的介绍 点我跳转 翻译 常量 EOF 文件结束(End-Of-Fi…

STM32F103C8T6 - 定时器

一、定时器简介 定时器总共分为4部分&#xff0c;8小结。 第一部分&#xff08;定时中断、内外时钟源选择&#xff09;&#xff1a;定时器基本定时计数功能&#xff0c;定一个时间&#xff0c;让定时器每隔一段时间定时中断一次 。 第二部分&#xff08;输出比较&#xff09…

21年408数据结构

第一题&#xff1a; 解析&#xff1a;q指针指向要被删除的元素&#xff0c;当这个元素是链表中唯一一个元素时&#xff0c;q指针和尾指针都指向同一个元素&#xff0c;那么在删除掉这个元素之前&#xff0c;需要将尾指针调整到指向头指针的位置&#xff0c;此时链表为空&#x…

【C++】——继承(下)

【C】——继承&#xff08;下&#xff09; 5 继承与友元6 继承与静态成员7 多继承7.1 继承模型7.2 菱形继承的问题7.3 虚继承7.4 多继承中的指针偏移问题 8 组合与继承 5 继承与友元 友元关系不能被继承。即一个函数是父类的友元函数&#xff0c;但不是子类的友元函数。也就是说…

独立站外链策略如何确保SEO效果最大化?

在SEO优化中&#xff0c;外链的建设是不可忽视的重要环节。特别是独立站外链&#xff0c;它不仅能够提升网站在搜索引擎中的排名&#xff0c;还能通过高质量的dofollow链接&#xff0c;促进谷歌对网站的快速收录。那么该如何建立一套有效的独立站外链策略&#xff1f; 首先&…

与C++内存管理和STL简介的爱恨情仇

本文 1.C/C内存分布2.C语言中动态内存管理方式&#xff1a;malloc/calloc/realloc/free总结 3.C内存管理方式new/delete操作内置类型new和delete操作自定义类型 4.operator new与operator delete函数&#xff08;重要点进行讲解&#xff09;5.new和delete的实现原理内置类型自定…

Redis主从复制机制详解

目录 一、主从复制介绍二、搭建主从复制三、主从复制流程四、关于Replication ID五、主从复制核心知识六、主从复制应用场景七、主从复制的注意事项 一、主从复制介绍 1、什么是主从复制&#xff1f; 2、为什么要使用主从复制&#xff1f; redis-server单点故障。单节点QPS…

MyBatis XML映射文件

XML映射文件 XML映射文件的名称与Mapper接口名称一致&#xff0c;并且将XML映射文件和Mapper接口放置在相同包下&#xff08;同包同名&#xff09;XML映射文件的namespace属性为Mapper接口全限定名一致XML映射文件中SQL语句的id与Mapper接口中的方法名一致&#xff0c;并保持返…

MBI6665Q升降压LED驱动芯片车规级AEC-Q100

MBI6665Q是由聚积科技&#xff08;Macroblock Inc.&#xff09;开发的一款多拓扑恒流LED驱动器&#xff0c;主要用于汽车照明应用。凭借其强大的功能集&#xff0c;MBI6665Q可以满足高效照明解决方案的需求&#xff0c;广泛应用于日间行车灯&#xff08;DRL&#xff09;、雾灯等…

idea的maven组件管理依赖小规则

pom文件引入一个依赖&#xff0c;idea会先找到依赖&#xff0c;然后才更新界面&#xff0c;如果找不到&#xff0c;不会更新界面&#xff0c;除非指定正确的版本才会更新界面&#xff0c;更新界面后&#xff0c;再次指定一个错误的版本&#xff0c;idea不会更新界面&#xff0c…

海康大华等厂家摄像头、执法记录仪等通过GB28181注册到LiveGBS平台,如何实时获取设备和通道的在线状态

LiveGBS如何订阅设备状态在线离线状态redis订阅设备或是通道状态subscribe device操作及示例 1、如何监听设备状态2、device订阅2.1、设备上线消息2.2、设备离线消息2.2、通道上线消息2.2、通道离线消息 3、订阅示例3.1、连接REDIS3.2、订阅device示例3.3、设备上线示例3.3.1、…