【Linux】生产者消费者模型:基于阻塞队列,使用互斥锁和条件变量维护互斥与同步关系

news2025/1/13 9:46:39

目录

一、什么是生产者消费者模型

二、为什么要引入生产者消费者模型?

三、详解生产者消费者模型 ​编辑

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者和消费者之间为什么会存在同步关系?

为什么生产者与消费者之间需要一个交易场所?

四、基于阻塞队列的生产者消费者模型

生产者消费者模型的使用场景示例:

五、理解生产者消费者模型代码


一、什么是生产者消费者模型

生产者-消费者模型是一种常见的并发设计模式,用于处理在生产和消费任务之间的协调。这个模型主要用来解决在多线程或多进程环境中,生产者和消费者之间的同步和数据共享问题。

在这个模型中:

  • 生产者:负责生成数据或任务,并将其放入一个共享的缓冲区(也称为队列)中。
  • 消费者:从共享缓冲区中取出数据或任务并进行处理。

缓冲区的作用是实现生产者和消费者之间的解耦,使得生产者和消费者可以在不同的速度下独立工作。

二、为什么要引入生产者消费者模型?

在没有这种模型的情况下,生产者和消费者可能会遇到以下问题:

  • 资源浪费:如果生产者生产速度过快,而消费者消费速度跟不上,可能会导致生产出的产品(通常是数据或任务)被丢弃,造成资源浪费。
  • 资源不足:如果消费者消费速度过快,而生产者生产速度跟不上,消费者可能会因为等待新资源而处于空闲状态,造成资源不足。
  • 竞态条件:在没有适当的同步机制的情况下,多个生产者或消费者同时访问共享资源(如队列)可能会导致数据不一致或状态错误。
  •  死锁:如果生产者和消费者在等待对方释放资源时都阻塞了,可能会导致死锁,即系统无法继续前进。
  • 饥饿:某些消费者可能因为其他消费者不断地获取资源而长时间得不到服务,这种现象称为饥饿。

生产者-消费者模型通过引入缓冲区(如队列)和同步机制(如信号量、互斥锁)来解决这些问题。模型通常包含以下几个关键组件:

  • 生产者:负责生成数据或任务。
  • 消费者:负责处理数据或任务。
  • 缓冲区:存储生产者生产的数据,供消费者使用。
  • 同步机制:确保生产者和消费者之间的协调,避免竞态条件和死锁。

通过这些组件,生产者-消费者模型可以有效地管理资源,确保生产者不会在缓冲区满时生产,消费者不会在缓冲区空时消费,从而实现生产者和消费者之间的平衡和同步。

三、详解生产者消费者模型 

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(如阻塞队列、环形队列等)

我们在用代码编写生产者消费者模型的时,本质就是对这三个特点进行维护。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者将生产的数据放入容器中,而消费者又会从容器中取出数据,这就造成了在同一时刻中,容器中的数据可能被多个执行流访问。而该容器其实就是临界资源,对于临界资源,我们必须保护对临界资源操作的原子性,否则容易造成数据错乱。

因此我们必须保证生产者和消费者访问临界资源的操作是串行的。每次访问,我们只允许一个执行流进入临界区。如此,我们就保证了临界资源的安全。

所以,在访问临界区资源之前,无论是生产者还是消费者,必须先去竞争保护临界资源的那把互斥锁。即生产者和消费者会进行同一把锁的竞争!

生产者和消费者之间为什么会存在同步关系?

如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

当缓冲区为满时,生产者需要在自己的条件变量下阻塞等待,直到有消费者进行消费,缓冲区有空间剩余时,才会继续与消费者竞争临界资源的管理权。

当缓冲区为空时,消费者需要在自己的条件变量下阻塞等待,直到有生产者进行生产,缓冲区有数据时,才会继续与生产者竞争临界资源的管理权。

【注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。】

为什么生产者与消费者之间需要一个交易场所?

生产者与消费者之间需要一个交易场所,通常称为缓冲区(Buffer),是因为这个缓冲区在多线程环境中起到了至关重要的作用。以下是缓冲区的几个关键作用:

  1. 协调生产和消费:缓冲区作为生产者和消费者之间的中介,允许生产者在缓冲区有空间时生产数据,消费者在缓冲区有数据时消费数据。这种协调机制避免了生产者和消费者之间的直接竞争,确保了生产和消费的有序进行。

  2. 解耦生产者和消费者:缓冲区使得生产者和消费者不必同时运行。生产者可以在没有消费者等待的情况下生产数据,消费者也可以在没有生产者生产的情况下消费数据。这种解耦提高了系统的灵活性和效率。

  3. 防止数据丢失:在没有缓冲区的情况下,如果生产者生产速度过快,消费者可能无法及时消费,导致数据丢失。缓冲区提供了一个存储空间,可以暂时保存生产者生产的数据,直到消费者准备好消费。

  4. 提高并行性:缓冲区允许生产者和消费者以不同的速度独立工作,提高了系统的并行性和吞吐量。生产者可以快速生产数据,而消费者可以根据自己的速度消费数据,两者互不干扰。

  5. 避免死锁和饥饿:通过适当的同步机制(如信号量和条件变量),缓冲区可以避免死锁和饥饿问题。生产者在缓冲区满时等待,消费者在缓冲区空时等待,这些等待条件可以通过同步机制得到管理,确保所有线程都能在适当的时候获得资源。

  6. 实现负载均衡:缓冲区可以平滑生产者和消费者之间的负载波动。在生产者负载高时,缓冲区可以存储额外的数据,而在消费者负载高时,缓冲区可以提供足够的数据供消费。

  7. 简化线程管理:缓冲区提供了一个明确的接口,使得线程管理更加简单。生产者和消费者只需要关注缓冲区的状态,而不需要直接管理其他线程。

总之,缓冲区作为生产者与消费者之间的交易场所,是实现高效、稳定和可扩展的多线程系统的关键组件。它通过协调生产和消费、解耦生产者和消费者、防止数据丢失、提高并行性、避免死锁和饥饿以及实现负载均衡等方式,提高了系统的整体性能和可靠性。

四、基于阻塞队列的生产者消费者模型

什么是阻塞队列?它的作用是什么?

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

生产者将数据放入队列,消费者从队列中取出数据。

  • 当队列为空时,消费者会阻塞等待,直到有数据可用;
  • 当队列满时,生产者会阻塞等待,直到有空间可用。 

通过阻塞操作,阻塞队列可以有效地管理资源,避免过度生产或消费,从而提高并发性能。

#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include "Thread.hpp"

static const int MAX_CAPACITY = 6;

template <typename T>
class BlockQueue
{
private:
    std::queue<T> _block_queue;//阻塞队列,临界资源
    int _max_capacity;//队列最大容量
    pthread_mutex_t _mutex;//生产者和消费者之间需要满足互斥关系。因为当生产者在生产时,消费者不能去消费,否则容易造成临界资源错乱。反之亦然
    pthread_cond_t _producer_cond;//当生产资源达到最大容量的时候,生产者需要在生产者的条件变量下等待,当有空余空间时,在进行生产,由消费者告知生产者来生产
    pthread_cond_t _consumer_cond;//当生产资源被消费完之后,消费者需要在消费者条件变量下等待,直到新的资源被生产,由生产者唤醒消费者来进行消费
    //在生产资源未到最大容量和生产资源未空时,消费者和生产者形成互斥关系,竞争临界资源的管理权
public:
    //构造
    BlockQueue(int capacity = MAX_CAPACITY)
    :_max_capacity(capacity)
    {
        pthread_mutex_init(&_mutex, nullptr);//初始化锁
        pthread_cond_init(&_producer_cond, nullptr);//初始化条件变量
        pthread_cond_init(&_consumer_cond, nullptr);
    }

    void Push(const T& in)
    {
        //生产者和消费者竞争同一把锁
        pthread_mutex_lock(&_mutex);

        while(IsFull())//为什么用while,不用if? 防止当有多个生产者时,使用broadcast会造成多个线程被唤醒,而此时多个线程已经经过了if判断。
        //使用if只能判断一次,无法避免时间差所带来的条件改变,因此需要循环检查。
        {
            //生产者在生产者的条件变量处等待,等待时释放锁。被唤醒时重新参与锁的竞争
            pthread_cond_wait(&_producer_cond, &_mutex);
        }
        //走到这里一定不为满
        _block_queue.push(in);
        pthread_mutex_unlock(&_mutex);

        //走到这个地方说明此时阻塞队列中一定有数据,可以唤醒消费者线程
        pthread_cond_signal(&_consumer_cond);
    }

    void Pop(T* out)    //输出型参数,将数据带出来
    {
        //生产者和消费者竞争同一把锁
        pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            //消费者在消费者的条件变量处等待,等待时释放锁,被唤醒时重新参与锁的竞争。竞争到锁之后才能返回
            pthread_cond_wait(&_consumer_cond, &_mutex);
        }
        //走到这里一定不为空
        *out = _block_queue.front();
        _block_queue.pop();
        pthread_mutex_unlock(&_mutex);

        //走到这个地方说明阻塞队列中一定有剩余空间,可以唤醒生产者
        pthread_cond_signal(&_producer_cond);
    }

    bool IsFull()
    {
        return _block_queue.size() == _max_capacity;
    }

    bool IsEmpty()
    {
        return _block_queue.empty();
    }

    //析构
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_producer_cond);
        pthread_cond_destroy(&_consumer_cond);
    }
};

需要注意的是,条件变量的等待必须要放在while循环中进行条件判断。

  • 在多线程环境中,多个线程可能同时操作共享资源。即使一个线程被唤醒,也可能因为其他线程的操作使得条件不再满足。因此,在条件变量的等待过程中,必须持续检查条件是否真的符合期望,确保线程在安全的条件下继续执行。
  • 在pthread_cond_wait函数的等待队列中可能存在多个等待线程。如果使用 if 进行条件判断,仅能在该执行流执行管理资源的代码前进行判断。假如此时阻塞队列中恰好生产了一个数据,而条件变量的等待队列中恰好有多个线程被其他线程使用pthread_cond_broadcast函数同时唤醒,无论

为什么不使用 if 判断呢?

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,此时在pthread_cond_wait函数的等待队列中可能存在多个等待线程,因此就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

生产者消费者模型的使用场景示例:

生产者承担的工作:不断地将创建的任务放入阻塞队列中。

消费者承担的工作:不断地从阻塞队列中提取任务,并进行执行。

阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。

#include "Blocking_Queue.hpp"
#include <functional>
#include <unistd.h>
#include <ctime>
using task_t = std::function<void()>;

void download()
{
    std::cout << "DownLooad ......" << std::endl;
}

void *Productor(void *block_queue)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(block_queue);
    while (true)
    {
        std::cout << "Producing ......" << std::endl;
        bq->Push(download);
        sleep(1);
    }
    return (void*)0;
}

void *Consumer(void *block_queue)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(block_queue);
    while (true)
    {
        sleep(1);
        std::cout << "Consuming ......" << std::endl;
        task_t out;
        bq->Pop(&out);
        out();
    }
    return (void*)0;
}

int main()
{
    BlockQueue<task_t> block_queue;
    pthread_t productor_tid, consumer_tid;
    pthread_create(&productor_tid, nullptr, Productor, static_cast<void *>(&block_queue));
    pthread_create(&consumer_tid, nullptr, Consumer, static_cast<void *>(&block_queue));
    pthread_join(productor_tid, nullptr);
    pthread_join(consumer_tid, nullptr);
    return 0;
}

由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。

需要注意的是:由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。这点可根据需求自行修改。

五、理解生产者消费者模型代码

看完上述内容后,同学们可能会有这种疑惑:生产者消费者模型不是为了实现高并发而设计的吗?为什么在生产者创建任务时和消费者提取任务时是串行的,而不是并行的?

在回答这个问题之前,我们需要了解一下什么是并发,什么又是并行。

并发(Concurrency):

定义:并发指的是系统能够处理多个任务的能力,这些任务可能是同时进行的,也可能是交替进行的。在并发的场景下,任务可以在同一时间段内交替执行,但不一定是同时的并发更关注的是任务的切换和管理。

并行(Parallelism)

定义:并行指的是系统能够真正地同时执行多个任务。在并行的场景下,多个任务在同一时间段内在不同的处理器或计算核心上同时执行。并行更多地关注的是计算任务的真正同时处理。

在生产者消费者模型中,生产者和消费者对阻塞队列的操作仅仅是向阻塞队列中添加和提取数据,而阻塞队列是共享资源,我们必须对其进行线程安全的保护,让各个执行流串行地去执行临界区代码,保证对临界资源操作的原子性。

添加与提取任务恰恰是生产者消费者模型中执行比较快速的操作。我们可以假设此时有多个生产者线程,此时只能有一个生产者线程能够进入临界区中。那么,在同一时刻,其他的生产者线程一定都在等待进入临界区中呢?答案是否定的!

我们需要知道,今天我们只是使用该模型执行了简单的任务。当遇见复杂任务时生产者需要时间来构建任务!反之,消费者也需要时间来执行任务! 在某个生产者线程向阻塞队列中添加任务时,其他生产者线程可能在等待,也可能在进行任务的生产。消费者线程亦然!

由此我们可以看出,上述代码实现的生产者消费者模型并不是低效的。相反,它有效地管理共享资源的访问、合理安排线程的任务,并确保系统在处理复杂任务时仍能保持高效和稳定。同时解决了生产者与消费者可能出现的忙闲不均的问题,实现了负载均衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2142365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信奥学习规划(CSP-J/S)

CSP-J组学习路线规划 CSP-S组学习规划

AbMole带你解密穿心莲:天然植物中的抗癌新星

在浩瀚的自然界中&#xff0c;隐藏着无数未被完全发掘的宝藏&#xff0c;其中&#xff0c;穿心莲&#xff08;Andrographis paniculata&#xff09;作为一种传统的药用植物&#xff0c;正逐渐展现出其在现代医学研究中的独特魅力。近日&#xff0c;一项发表在《Phytomedicine》…

Jemter项目实战(黑马程序员)

视频网址&#xff1a;02测试数据准备_哔哩哔哩_bilibili 自动化脚本架构搭建 新增和修改 新增 删除和查询 弱压力、高并发、高频率 弱压力测试 高并发 高频率 生成图形化报告

深入理解命令模式:行为设计模式的精髓

在软件设计中&#xff0c;命令模式&#xff08;Command Pattern&#xff09;是一种行为设计模式&#xff0c;它将请求封装成对象&#xff0c;从而使你可以用不同的请求对客户进行参数化&#xff0c;并且支持请求的排队、记录日志以及撤销操作。命令模式是设计模式中的一种&…

通信工程学习:什么是EPON以太网无源光网络

EPON&#xff1a;以太网无源光网络 EPON&#xff08;Ethernet Passive Optical Network&#xff0c;以太网无源光网络&#xff09;是一种结合了以太网技术和无源光网络&#xff08;PON&#xff09;优势的高速、大容量宽带接入技术。以下是关于EPON的详细解释&#xff1a; 一、…

【经典文献】双边滤波

文章目录 ICCV 1998基本思路双边高斯滤波 ICCV 1998 1995年&#xff0c;Aurich和Weule提出一种非线性高斯滤波器&#xff0c;三年后&#xff0c;Tomasi和Manduchi将其用于图像平滑&#xff0c;并将其命名为双边滤波。 Aurich, V., & Weule, J. (1995). Non-linear Gaussi…

Git常用指令整理【新手入门级】【by慕羽】

Git 是一个分布式版本控制系统&#xff0c;主要用于跟踪和管理源代码的更改。它允许多名开发者协作&#xff0c;同时提供了强大的功能来管理项目的历史记录和不同版本。本文主要记录和整理&#xff0c;个人理解的Git相关的一些指令和用法 文章目录 一、git安装 & 创建git仓…

蓝桥杯-STM32G431RBT6(串口)

前言 一、配置 二、使用步骤 1.串口发送 代码逻辑 效果展示 2.串口接收单个字符 代码逻辑 中断回调函数 3.串口接受字符串 代码逻辑 字符串函数 中断回调函数 声明 代码开源 前言 一、配置 二、使用步骤 1.串口发送 代码逻辑 sprintf(tx_buf,"jin ke\r\n&…

(学习总结17)C++继承

C继承 一、继承的概念与定义继承的概念继承定义1. 定义格式2. 继承基类成员访问方式的变化 继承类模板 二、基类和派生类间的转换三、继承中的作用域隐藏规则 四、派生类的默认成员函数4个常见默认成员函数实现一个不能被继承的类 五、继承与友元六、继承与静态成员七、多继承及…

DFS:二叉树中的深搜

✨✨✨学习的道路很枯燥&#xff0c;希望我们能并肩走下来! 文章目录 目录 文章目录 前言 一. 深搜的总结 二. 二叉树的深搜题目 2.1 计算布尔二叉树的值 2.2 求根节点到叶节点的数字之和 2.3 二叉树剪枝 2.4 验证二叉搜索树 2.5 二叉搜索树中第k小的节点 2.6 二叉树的…

【C++】——继承详解

目录 1、继承的概念与意义 2、继承的使用 2.1继承的定义及语法 2.2基类与派生类间的转换 2.3继承中的作用域 2.4派生类的默认成员函数 <1>构造函数 <2>拷贝构造函数 <3>赋值重载函数 <4析构函数 <5>总结 3、继承与友元 4、继承与静态变…

在VC++6.0中创建一个C++项目

1、下载并安装VC6.0 暂时不介绍下载安装&#xff0c;后续可能会补充 2、打开VC6.0 初始界面如下图&#xff1a; 3、创建一个空工程 文件-新建 在新建弹框中选择&#xff1a;工程-win32 console Application-填写工程名、选择保存路劲-确定 在新的弹框中&#xff0c;选择&…

BIT小学期-电话号码问题

Output 输出包括两个部分&#xff0c;第一个部分是错误的电话号码&#xff0c;对于这些号码应当按照输入的顺序以原始的形式输出。在输出错误电话号码前输出Error:&#xff0c;随后输出这些号码&#xff0c;如果没有错误的电话号码&#xff0c;则输出Not found. 第二部分是重…

[C++进阶]AVL树

前面我们说了二叉搜索树在极端条件下时间复杂度为O(n),本篇我们将介绍一种对二叉搜索树进行改进的树——AVL树 一、AVL 树的概念 二叉搜索树虽可以缩短查找的效率&#xff0c;但如果数据有序或接近有序二叉搜索树将退化为单支树&#xff0c;查找效率低下。因此&#xff0c;两位…

6个Python小游戏项目源码【免费】

6个Python小游戏项目源码 源码下载地址&#xff1a; 6个Python小游戏项目源码 提取码: bfh3

深度学习Day-33:Semi-Supervised GAN理论与实战

&#x1f368; 本文为&#xff1a;[&#x1f517;365天深度学习训练营] 中的学习记录博客 &#x1f356; 原作者&#xff1a;[K同学啊 | 接辅导、项目定制] 一、 基础配置 语言环境&#xff1a;Python3.8编译器选择&#xff1a;Pycharm深度学习环境&#xff1a; torch1.12.1c…

cout无法正常显示中文

cout无法正常显示中文 虽然你使用了buf.length()来指定写入的字节数&#xff0c;但是在包含中文字符&#xff08;UTF-8编码下每个中文字符占用3个字节&#xff09;的情况下&#xff0c;直接使用length()可能不会正确反映实际的字节数&#xff0c;因为它给出的是字符数而非字节…

基于深度学习,通过病理切片直接预测HPV状态|文献速递·24-09-16

小罗碎碎念 有段时间没有写文献速递的推文了&#xff0c;搞得自己今天写还怪不适应的。 今天所有的推文&#xff0c;都是围绕一个系统的问题展开——既研究了HPV与EBV在头颈癌/鼻咽癌中的致病机制&#xff0c;也总结了如何结合病理组学直接由WSI预测HPV状态——没办法&#x…

变压器漏感对整流电路的影响

目录 1. 电压波形畸变 2. 输出电压波动 3. 电流纹波增加 4. 降低整流效率 5. 影响开关器件的性能 6. EMI&#xff08;电磁干扰&#xff09;增加 总结与应对措施 变压器漏感在整流电路中会产生一些影响&#xff0c;尤其在高频应用或电流变化较大的情况下&#xff0c;其影…

【GESP】C++一级练习BCQM3006,多行输出

多行输出练习题&#xff0c;使用cout或printf函数输出多行内容。 BCQM3006 题目要求 描述 在windows的控制台环境中所有的字符都是等宽的&#xff0c;默认情况下窗口中每行有 80 个字符&#xff0c;每个屏幕有 25 行&#xff0c;组成了一个字符矩阵。利用控制台的这个特点&a…