Linux -- 生产消费模型

news2025/3/17 11:33:10

目录

概念

代码

BlockQueue.hpp 代码:

伪唤醒!!

Thread.hpp 代码:

Task.hpp 代码:

test.cc 代码:

再次理解


概念

生产消费模型,也称为生产者-消费者问题,是计算机科学中的一个经典同步问题。它描述了两个或多个共享固定大小缓冲区的进程(或线程)——生产者和消费者之间的交互。生产者负责生成一定量的数据并将其放入缓冲区(生产),而消费者从缓冲区取出数据(消费)并进行处理。这个缓冲区就是用来给生产者和消费者解耦的。

这个模型的主要挑战在于确保生产者不会在缓冲区满时继续添加数据,同时确保消费者不会在缓冲区为空时尝试移除数据。这需要对访问共享资源(即缓冲区)进行适当的同步,以避免竞争条件。

该模型包含三种关系,分别是生产者与生产者之间的关系、消费者与消费者之间的关系、生产者与消费者之间的关系。

代码

BlockQueue.hpp 代码:

伪唤醒!!

伪唤醒(Spurious Wakeup)是指在一个多线程环境中,一个等待条件变量的线程在没有明确原因的情况下被唤醒。

伪唤醒的主要特征是:

  • 无明显原因:线程被唤醒时,预期的条件并没有发生变化或者满足。
  • 不可预测性:无法准确预测何时会发生伪唤醒,它们似乎是随机发生的。
  • 平台依赖性:不同的操作系统或编程语言实现可能有不同的发生概率。

假设现在只有 1 个生产者,但是有 5 个消费者,现在的资源非常短缺,生产者一次生产的量不多,5 个消费者全在等待队列中等待,生产者生产了一个数据,生产者广播,把 5 个消费者全都唤醒了,5 个消费者全都去竞争互斥锁了,消费者 1 竞争成功,进行消费,消费完通知生产者生产,消费者 1 竞争锁成功时,由于代码的健壮性不足,剩下的消费者并没有回到条件变量下的等待队列去等待(因为大家都被唤醒了),而是在互斥锁下的等待队列中等待,等着锁被释放后去竞争锁,消费者 1 消费完后剩下的消费者开始竞争这把锁,但此时生产者还没有生产出数据,即阻塞队列为空消费者 2 竞争锁成功了,只能继续向下走,根本没有判断缓冲区是否为空(因为我们写的是 if 判断),导致消费者 2 在空的阻塞队列中取数据!这是非法的!

简单来说,生产者唤醒了所有的消费者,但是只有部分消费者可以消费剩下的消费者根本没有数据可以消费,但还是被唤醒了,这钟唤醒就是伪唤醒!

把 if 判断 改为 while 判断后,即使消费者 2 唤醒后,抢到了互斥锁,也需要判断阻塞队列是否为空,才可以去访问阻塞队列,才可以消费,如果阻塞队列为空,消费者 2 就去条件变量下的等待队列中等待,这样就不会出现伪唤醒的情况! 

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>

template<class T>
class BlockQueue
{
private:
    //判断阻塞队列(即缓冲区)是否为满
    bool IsFull()
    {
        return _block_queue.size()==_cap;
    }
    //判断阻塞队列是否为空
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap)
        :_cap(cap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_consumer_cond,nullptr);
        pthread_cond_init(&_productor_cond,nullptr);
    }

    //生产者放入数据
    void Enqueue(T &in)
    {
        pthread_mutex_lock(&_mutex);

        //阻塞队列满了,生产者线程等待
        while(IsFull())
        {
            _productor_wait_num++;//等待的数量+1
            pthread_cond_wait(&_productor_cond,&_mutex);
            _productor_wait_num--;//等待的数量-1
        }
        //阻塞队列不是满的才可以放入数据
        _block_queue.push(in);

        //通知消费者线程,可以消费了
        //有消费者线程在等待才唤醒
        if(_consumer_wait_num>0)
            pthread_cond_signal(&_consumer_cond);

        pthread_mutex_unlock(&_mutex);
    }
    //消费者取出数据,out是输出型参数
    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);

        //阻塞队列为空,消费者线程等待
        while(IsEmpty())
        {
            _consumer_wait_num++;
            pthread_cond_wait(&_consumer_cond,&_mutex);
            _consumer_wait_num--;
        }
        //阻塞队列不为空,消费者可以取出数据
        *out=_block_queue.front();
        _block_queue.pop();

        //如果生产者线程因为阻塞队列满了在等待,则唤醒生产者线程
        if(_productor_wait_num>0)
            pthread_cond_signal(&_productor_cond);

        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumer_cond);
        pthread_cond_destroy(&_productor_cond);
    }

private:
    std::queue<T> _block_queue;
    int _cap;//上限
    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _consumer_cond;//消费者的条件变量
    pthread_cond_t _productor_cond;//生产者的条件变量

    int _consumer_wait_num;//生产者在等待队列中的数量
    int _productor_wait_num;//消费者在等待队列中的数量
};

#endif

Thread.hpp 代码:

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        Thread(func_t<T> func, T &data, const std::string &name="none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T> *self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T &_data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} 

#endif

Task.hpp 代码:

#pragma once

#include<iostream>
#include<string>
#include<functional>

using Task=std::function<void()>;

test.cc 代码:

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include<ctime>

using namespace ThreadModule;
using blockqueue_t = BlockQueue<Task>;

void PrintHello()
{
    std::cout << "hello world" << std::endl;
}

void Productor(blockqueue_t &bq)
{
    while(true)
    {
        sleep(1);
        Task t=PrintHello;
        //放入任务
        bq.Enqueue(t);
    }
}

void Consumer(blockqueue_t &bq)
{
    while(true)
    {
        //从bq中取出任务
        Task t;
        bq.Pop(&t);

        //处理任务
        t();
    }
}
void StartComm(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq,func_t<blockqueue_t> func)
{
    for(int i=0;i<num;i++)
    {
        std::string name="thread-"+std::to_string(i+1);
        threads->emplace_back(func,bq,name);
        threads->back().Start();
    }
}
void StartProductor(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
   StartComm(threads,num,bq,Productor);
}

void StartConsumer(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
   StartComm(threads,num,bq,Consumer);
}

void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{
    for(auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    // 需要执行的任务
    blockqueue_t *bq = new blockqueue_t(5);
    // 存储线程
    std::vector<Thread<blockqueue_t>> threads;

    StartProductor(&threads,3,*bq);
    StartConsumer(&threads,1,*bq);

    WaitAllThread(threads);

    return 0;
}

再次理解

生产消费模型可以实现并发,但并不是在线程向阻塞队列中放入数据、取出数据这个过程体现的,这个过程是互斥的,不是并发的。

并发体现在当生产者线程放入队列时,消费者线程不一定都是在等待取出数据,不一定都在竞争锁,而可能在拿到数据之后执行各自的任务,即消费者线程可以并发地执行各自的任务。生产者线程也是同理,生产者线程不一定一直都在等待放入数据,也可能并发地在执行生产数据的任务

系统先调度生产者线程还是消费者线程并不重要:

  • 如果先调度了消费者线程,消费者判断阻塞队列为空,就会等待生产者放入数据,直到生产者线程唤醒消费者线程;
  • 如果先调度了生产者线程,生产者判断阻塞队列不为满,就会向阻塞队列中放入数据。 

也就是说,消费者线程的消费会按照生产者线程的步调来进行,消费者不能抢在生产者前面消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2266525.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DevOps实战:用Kubernetes和Argo打造自动化CI/CD流程(1)

DevOps实战&#xff1a;用Kubernetes和Argo打造自动化CI/CD流程&#xff08;1&#xff09; 架构 架构图 本设计方案的目标是在一台阿里云ECS服务器上搭建一个轻量级的Kubernetes服务k3s节点&#xff0c;并基于Argo搭建一套完整的DevOps CI/CD服务平台&#xff0c;包括Argo CD…

React 第二十节 useRef 用途使用技巧注意事项详解

简述 useRef 用于操作不需要在视图上渲染的属性数据&#xff0c;用于访问真实的DOM节点&#xff0c;或者React组件的实例对象&#xff0c;允许直接操作DOM元素或者是组件&#xff1b; 写法 const inpRef useRef(params)参数&#xff1a; useRef(params)&#xff0c;接收的 …

【2024最新】基于Python+Mysql+django的水果销售系统Lw+PPT

作者&#xff1a;计算机搬砖家 开发技术&#xff1a;SpringBoot、php、Python、小程序、SSM、Vue、MySQL、JSP、ElementUI等&#xff0c;“文末源码”。 专栏推荐&#xff1a;SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;Java精选实战项…

利用Java爬虫速卖通按关键字搜索AliExpress商品

在这个信息爆炸的时代&#xff0c;数据的价值日益凸显。对于电商领域的从业者来说&#xff0c;能够快速获取商品信息成为了一项重要的技能。速卖通&#xff08;AliExpress&#xff09;作为全球领先的跨境电商平台&#xff0c;拥有海量的商品数据。本文将介绍如何使用Java语言编…

动态规划<五> 子数组问题(含对应LeetcodeOJ题)

目录 引例 经典LeetcodeOJ题 1.第一题 2.第二题 3.第三题 4.第四题 5.第五题 6.第六题 7.第七题 引例 OJ传送门 Leetcode<53> 最大子数组和 画图分析: 使用动态规划解决 1.状态表示 dp[i]表示以i位置为结尾的所有子数组中的最大和 2.状态转移方程 子数组的问题可以…

前端实现PDF预览的几种选择(pdfjs-dist、react-pdf、pdf-viewer)

记录 PDF预览的选型 对于浏览器自带的PDF预览 如果能直接使用&#xff0c;那自然最好不过了&#xff0c;但考虑多种因素&#xff0c;比如权限问题&#xff0c;禁止用户去下载PDF、预览样式不统一&#xff08;不同浏览器PDF预览的实现不同&#xff09;&#xff0c;所有最终放弃…

小米路由器开启SSH,配置阿里云ddns,开启外网访问SSH和WEB管理界面

文章目录 前言一、开启SSH二、配置阿里云ddns1.准备工作2.创建ddns脚本3.添加定时任务 三、开启外网访问SSH和WEB管理界面1、解除WEB管理页面访问限制2.手动添加防火墙端口转发规则&#xff0c;开启外网访问WEB管理和SSH 前言 例如&#xff1a;随着人工智能的不断发展&#xf…

机器学习(三)-多项式线性回归

文章目录 1. 多项式回归理论2. python通过多项式线性回归预测房价2.1 预测数据2.2导入标准库2.3 导入数据2.4 划分数据集2.5 构建二次多项式特征&#xff08;1, x, x^2&#xff09;2.6 导入线性回归模块2.7 对测试集进行预测2.8 计算均方误差 J2.9 计算参数 w0、w1、w22.10 可视…

【再学javascript算法之美】前端面试频率比较高的基础算法题

基础算法题练习代码&#xff0c;看看能做出几道题 代码实现 找出字符串中出现次数最多的字符 const array "cncnansdnajsadnjasndjnasjdnjj";// 找出出现次数最多的字符 let obj {}; for (let index 0; index < array.length; index) {const element array[…

芯产品|暴雨推出基于兆芯晶片的新品台式机

近期&#xff0c;基于兆芯开先KX-7000系列处理器&#xff0c;暴雨推出新品桌面整机TSJ200-ZX&#xff0c;凭借开先KX-7000系列处理器强劲的性能表现和优异的兼容性&#xff0c;将为行业信创深入发展增添更多的活力和能量。 暴雨TSJ200-ZX是针对政务办公&#xff0c;金融机构和…

echarts进度仪表盘形式

const pointerData 55; // 仪表指针数据const steps 10; // 总共10个步骤 const borderColor {colorStops: [{offset: 0,color: rgba(208, 244, 255, 1)}, {offset: 1,color: rgba(35, 190, 240, 1)}] }; // 边框颜色// 使用数组和循环动态生成颜色数组 const axisLinecolor…

代码随想录-笔记-其八

让我们开始&#xff1a;动态规划&#xff01; 70. 爬楼梯 - 力扣&#xff08;LeetCode&#xff09; 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; class Solution { public:int climbStairs(i…

线性回归a

训练数据 求平方损失的平均值1/n&#xff0c;目标求解w&#xff0c;b使得损失函数最小 显示解

学习记录—正则表达式-基本语法

正则表达式简介-《菜鸟教程》 正则表达式是一种用于匹配和操作文本的强大工具&#xff0c;它是由一系列字符和特殊字符组成的模式&#xff0c;用于描述要匹配的文本模式。 正则表达式可以在文本中查找、替换、提取和验证特定的模式。 本期内容将介绍普通字符&#xff0c;特殊…

利用AI优化SEO关键词提升网站流量的有效策略

内容概要 在数字化时代&#xff0c;网站流量的增加对于任何企业或个人至关重要。为了在竞争激烈的市场中吸引更多用户&#xff0c;优化网站的SEO关键词显得尤为重要。随着人工智能技术的迅猛发展&#xff0c;它在SEO领域的应用也逐渐渗透&#xff0c;为关键词优化提供了新的可…

敏捷开发05:Sprint Planning 冲刺计划会议详细介绍和用户故事拆分、开发任务细分

Sprint Planning 冲刺计划会议简介 Sprint Planning &#xff08;冲刺计划会议&#xff09;&#xff0c;又叫规划会议。此会议通过 Scrum 团队的集体沟通讨论&#xff0c;确定接下来的 Sprint 中要完成的待开发项&#xff0c;把它们组成一个 Sprint Backlog。这些待开发项都是…

极简容器云WeKube快速体验

极简容器云WebKube快速体验 WeKube是什么&#xff1f; 概述 WeKube 是一个基于 Kubernetes 构建的极简Serverless容器服务&#xff0c;它提供了一个简单直观的方式来部署、管理和监控容器化的应用程序。WeKube 的目标是让用户无需关心底层基础设施的具体细节&#xff0c;而是…

Java开发经验——数据库开发经验

摘要 本文主要介绍了Java开发中的数据库操作规范&#xff0c;包括数据库建表规范、索引规约、SQL规范和ORM规约。强调了在数据库设计和操作中应遵循的最佳实践&#xff0c;如字段命名、数据类型选择、索引创建、SQL语句编写和ORM映射&#xff0c;旨在提高数据库操作的性能和安…

ovirt-engine登录报错

ovirt-engine登录报错 注&#xff1a;用户名不是admin&#xff0c;而是adminlocalhost

windows nmake 安装openssl

windows nmake 编译和安装 openssl 本文提供了在Windows环境下安装OpenSSL的详细步骤&#xff0c;包括下载Perl、NASM和VisualStudio&#xff0c;配置环境变量&#xff0c;使用PerlConfigure设置平台&#xff0c;通过nmake进行编译、测试和安装。整个过程涉及32位和64位版本的选…