【Linux】条件变量及生产者消费者模型

news2025/1/12 3:59:57

为什么要将这两者放在一起进行呢?
主要是因为生产消费与条件变量关系密切,正好相辅相成。

目录

  • 条件变量:
    • 条件变量的引出:
    • 条件变量的解释与接口:
    • 测试代码:
  • 生产者消费者模型:
    • 概念:
    • 代码实现Blocking Queue:
    • 完整代码:
    • 测试结果:
    • 一些问题:

条件变量:

条件变量的引出:

我们假设有一个自习室,这个自习室每次只能有一个人进入(使用挂在自习室门前钥匙),其他的人都要排队(排队之前都去试图找钥匙)。

今天小A起了个大早去拿钥匙,一直学到中午,此时饿得受不了了,于是想出去吃饭,他就走到门前打算归还钥匙,可是刚挂到门前就后悔了,因为她不想排队那么久才能继续学习,于是又拿着钥匙将门打开,但是又很饿,就这样循环往复,自己没有得到知识,外面的人也只能干排队等待。

与之对应:
自习室就相当于临界资源,钥匙就是锁,当其中一个线程拿到锁就可以访问临界资源,其他的线程都陷入阻塞状态。
由于线程A继续访问临界资源已经没有意义,但是竞争锁的能力很强,因为距离自己很近,造成了其他线程饥饿问题

虽然线程A这样做合法,但是不道德。
我们此时需要制定规则,保证过程具有一定顺序性,也就是同步。
而条件变量就是确保这个顺序性的!

条件变量的解释与接口:

我们知道了条件变量的作用,但是还是并不清楚条件变量是什么。

我们再来一个例子进行解释。

假设有一个游戏,一个蒙眼拿盘子中的苹果,一个睁眼将苹果放入盘子。

在这里插入图片描述

现在我们就可以对应一下了。
铃铛就是条件变量,他是一个队列,可以让线程进行等待
他的唤醒有两种策略,全部唤醒与单个唤醒。

我们也就可以对应一下条件变量的接口了。
有些接口与锁很类似。
在这里插入图片描述
其中init是条件变量初始化的函数,与锁一致。
signal与broadcast就是通知接口,signal是一次通知一个,boardcast就是全部通知
wait就是去铃铛下等待,也就是去条件变量下等待。
timedwait我们不管。
desory就是销毁条件。

关于这里还有一个细节,为什么条件变量要把锁业传入?
后面会进行解释。

测试代码:

我们目前可以浅浅的使用一下条件变量,熟悉一下接口。

我们现在要实现一个场景,在这里插入图片描述
主要逻辑是先创建一批线程并进行管理,每个创建好的线程都去执行各自的任务,任务是个死循环,每次进入在条件变量下等待被唤醒。

#include <iostream>
#include <pthread.h>
#include <unistd.h>

const int N = 5;

pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;

void *Run(void *args)
{
    while (true)
    {
        pthread_mutex_lock(&g_mutex);
        pthread_cond_wait(&g_cond, &g_mutex);

        std::cout << "i am new thread-" << reinterpret_cast<long long>(args) << std::endl;

        pthread_mutex_unlock(&g_mutex);
    }
}

int main()
{
    pthread_t thds[N];
    for (int i = 0; i < N; i++)
    {
        pthread_create(&thds[i], nullptr, Run, reinterpret_cast<void*>(i));
    }

    // 唤醒
    while (true)
    {
        sleep(1);
        pthread_cond_signal(&g_cond);
    }

    for (auto &val : thds)
    {
        pthread_join(val, nullptr);
    }

    return 0;
}

现象:
在这里插入图片描述

生产者消费者模型:

概念:

条件变量暂时说到这里,我们先来谈一谈这个模型。

我们依旧是拿具体场景进行解释:
这是最简单的一个图示
在这里插入图片描述
实际上,我们的生产者代表的就是供货商,超市是一块缓存,顾客就是消费者。

这个模型有3大优点:

  1. 协调忙闲不均:
    当生产慢时而消费又多,我们就可以提前生产一批产品到超市;
    当生产快时而消费又慢,我们就可以提前让消费者来到超市;
  2. 效率高:
    你在买东西时,供应商在生产,达到了消费生产并发
    你在处理你买的东西时,供应商把商品放入超市,达到放入处理并发
  3. 解耦:
    生产者与消费者互不影响:比如一共有10个供货商,其中一个倒闭了,并不影响你买商品;当你吃泡面撒掉了,也不会影响供应商供货。

高度提炼一下可以简记为:1 2 3 原则。
1:一个交易场所(一段内存空间)
2:两种角色(生产角色,消费角色)
3:三种关系(生产与生产,消费与消费,生产与消费)
其中这三种关系分别为互斥,互斥,互斥&&同步。

相信前两个都很好理解,第三个呢?

比如供应商生产好商品还没录入数据就肯定不能被顾客拿走,这就是互斥。
这就像你在写数据,还没写完就被读走了一样,他们之间也是需要互斥的。
同样,如果超市没货了,我们要要依靠超市通知供应商,等有货了在反过来通知顾客。这就是同步。

代码实现Blocking Queue:

为什么写单对单?因为他简单!为什么不吃牛,因为他善!在这里插入图片描述
不过为什么单对单简单,因为我们的1 2 3原则中有三个关系,生产与生产,消费与消费,生产与消费,单对单就不需要考虑前两种,自然而言的简单了许多。
这里我们要解决一个历史问题:为什么wait要传入锁?

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
在这里插入图片描述

那么我们现在就要有一个大概的轮廓:

生产者一直写,消费者一直读即可,很基础的框架,有了框架我们就知道如何下手实现主要功能了。

#include "BlockingQueue.hpp"

void *Consumer(void *args)
{
    BlockingQueue *bq = static_cast<BlockingQueue*>(args);

    while (true)
    {
        // 接收数据

        bp.Pop();

        // 处理数据
    }
}

void *Producer(void *args)
{
    BlockingQueue *bq = static_cast<BlockingQueue*>(args);

    while (true)
    {
        // 构建数据

        bp.Push();
    }
}

int main()
{
    BlockingQueue pc;
    pthread_t t1, t2;
    pthread_create(&t1, nullptr, Producer, static_cast<void*>(&pc));
    pthread_create(&t2, nullptr, Consumer, static_cast<void*>(&pc));

    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);

    return 0;
}

这是我们的阻塞队列,里面有对于一些点的详细注释,
比如cond为什么要传入锁?唤醒在哪个位置进行唤醒?
而又因为我们也不确定用户使用的是什么类型,设计为模板即可。

#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template <class T>
class BlockingQueue
{
public:
    BlockingQueue(int cap = 5) : _cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond_p, nullptr);
        pthread_cond_init(&_cond_c, nullptr);
    }
    void Push(const T &val)
    {
        pthread_mutex_lock(&_mutex);

        if (IsFull())
        {
            pthread_cond_wait(&_cond_p, &_mutex);
            // 所以这里的wait为什么要传锁就一目了然了
            // 因为如果你带着锁去wait,那么别的线程只会一直阻塞,不会拿到锁
            // 所以wait还要进行解锁,等被唤醒在继续抢锁。
        }
        // 出来了就代表肯定不是满的,此时push数据即可。
        _q.push(val);

        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_cond_c);
        // 注意这里的唤醒放在解锁的上或下都可以,当放在下时,我们还是以单对单为例
        // 假设我们此时队列中还没有数据,消费者在wait中解锁阻塞,生产者生产完数据进行解锁再唤醒消费者,
        // 而这时消费者会和生产者再次抢锁,因为消费者会从wait中被唤醒需要抢锁,而生产者在lock中抢锁。
        // 如果消费者抢到了就罢了,直接进行写入数据;但是如果生产者抢到了,那就继续生产数据,
        // 我们假设生产者一直将队列写满,那么他就会在wait中进行阻塞,让消费者写入数据在解锁唤醒。
        // 同理,实际上因为锁的钳制,在解锁前或后唤醒都是可以的
    }
    void Pop(T *val)
    {
        pthread_mutex_lock(&_mutex);

        if (IsEmpty())
        {
            pthread_cond_wait(&_cond_c, &_mutex);
        }
        // 出来了就代表此时数据肯定不为空,可以写给消费者了。
        *val = _q.front();
        _q.pop();

        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_cond_p);
    }
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond_p);
        pthread_cond_destroy(&_cond_c);
    }

private:
    bool IsFull()
    {
        return _cap == _q.size();
    }
    bool IsEmpty()
    {
        return _q.empty();
    }

private:
    int _cap;
    std::queue<T> _q;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_p;
    pthread_cond_t _cond_c;
};

但是这里还有一个细节,我们判断空或满时使用了if,这在某些情况下是会出问题的。

比如当前队列中没有数据。
有一个生产者,两个消费者。
当生产者刚刚生产完,进行了broadcast唤醒,导致两个消费者线程都被唤醒我们假设生产者此时不会抢到锁,所以目前只会有一个消费者线程抢到锁,当其中一个A抢到后,进行写入数据并pop,这都是没有问题的,但是如果A解锁后被另一个消费者B抢到,那么B又会继续写入并进行pop。
问题就出现了,因为唯一的一个数据已经被A拿走了,此时就会出现err,所以我们要对if进行一下修改,改为while,这样就避免了因为B抢到锁而继续pop造成的err。

完整代码:


```cpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template <class T>
class BlockingQueue
{
public:
    BlockingQueue(int cap = 5) : _cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond_p, nullptr);
        pthread_cond_init(&_cond_c, nullptr);
    }
    void Push(const T &val)
    {
        pthread_mutex_lock(&_mutex);

        while (IsFull())
        {
            pthread_cond_wait(&_cond_p, &_mutex);
            // 所以这里的wait为什么要传锁就一目了然了
            // 因为如果你带着锁去wait,那么别的线程只会一直阻塞,不会拿到锁
            // 所以wait还要进行解锁,等被唤醒在继续抢锁。
        }
        // 出来了就代表肯定不是满的,此时push数据即可。
        _q.push(val);

        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_cond_c);
        // 注意这里的唤醒放在解锁的上或下都可以,当放在下时,我们还是以单对单为例
        // 假设我们此时队列中还没有数据,消费者在wait中解锁阻塞,生产者生产完数据进行解锁再唤醒消费者,
        // 而这时消费者会和生产者再次抢锁,因为消费者会从wait中被唤醒需要抢锁,而生产者在lock中抢锁。
        // 如果消费者抢到了就罢了,直接进行写入数据;但是如果生产者抢到了,那就继续生产数据,
        // 我们假设生产者一直将队列写满,那么他就会在wait中进行阻塞,让消费者写入数据在解锁唤醒。
        // 同理,实际上因为锁的钳制,在解锁前或后唤醒都是可以的
    }
    void Pop(T *val)
    {
        pthread_mutex_lock(&_mutex);

        while (IsEmpty())
        {
            pthread_cond_wait(&_cond_c, &_mutex);
        }
        // 出来了就代表此时数据肯定不为空,可以写给消费者了。
        *val = _q.front();
        _q.pop();

        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_cond_p);
    }
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond_p);
        pthread_cond_destroy(&_cond_c);
    }

private:
    bool IsFull()
    {
        return _cap == _q.size();
    }
    bool IsEmpty()
    {
        return _q.empty();
    }

private:
    int _cap;
    std::queue<T> _q;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_p;
    pthread_cond_t _cond_c;
};

main函数代码:

#include "BlockingQueue.hpp"

#include <random>

void *Consumer(void *args)
{
    BlockingQueue<int> *bq = static_cast<BlockingQueue<int>*>(args);

    while (true)
    {
        sleep(2);
        // 接收数据
        int val;
        bq->Pop(&val);

        // 处理数据
        std::cout << "Consumer receive a data: " << val << std::endl;
    }
}

void *Producer(void *args)
{
    BlockingQueue<int> *bq = static_cast<BlockingQueue<int>*>(args);

    while (true)
    {
        // 构建数据
        int val = rand() % 10 + 1;// [1, 10]
        bq->Push(val);

        std::cout << "Producer produce a data: " << val << std::endl;
    }
}

int main()
{
    srand(time(nullptr));
    BlockingQueue<int> pc;
    pthread_t t1, t2;
    pthread_create(&t1, nullptr, Producer, static_cast<void*>(&pc));
    pthread_create(&t2, nullptr, Consumer, static_cast<void*>(&pc));

    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);

    return 0;
}

测试结果:

在这里插入图片描述
观察到生产者生产顺序为1 9 6 6 6…而消费者也正好是1 9 6 6 6…

注意:我们的生产者消费者不仅仅只能传递最简单的内置类型,也可以进行传递自定义类型!可调用对象也是可以的!

一些问题:

我们的多生产多消费不需要修改代码,因为锁已经帮我们搞定了生产与生产,消费与消费之间的关系。

为什么明明一次只能有一个线程访问队列,但还是说生产消费模型效率高?
因为我们不能只关注生产与消费在缓存的时间!
我们的生产任务或数据需要时间,处理任务或数据需要时间。
也就是说:当其中一个生产者在放任务,其他的生产者在生产任务;
其中一个消费者在拿任务,其他消费者在处理任务。
这种并发才让我们的效率变高!

为什么要在锁之后再进行条件变量wait?
这是因为我们push或pop之前一定会临街资源,临界资源一定是被锁保护起来的,所以设计者才会这样设计接口,我们也才要这样使用。

完~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1944551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Window版本nginx修改文件访问句柄数被限制,解决大并发量访问时无法响应问题

目录 一、问题背景 二、问题分析 三、解决办法 四、查看nginx连接状态 一、问题背景 Windows版本因为文件访问句柄数被限制为1024了&#xff0c;当大并发量访问时就会无法响应。会有如下错误提示&#xff1a;maximum number of descriptors supported by select() is 1024…

强化学习学习(二)基于价值函数就能做到RL——Q-Learning Q学习

文章目录 Value funtion methods-为什么我们用回了Q函数&#xff1f;Q-iterationQ-Learning (P30) Value funtion methods-为什么我们用回了Q函数&#xff1f; 先回顾一下在AC中的基于V函数的框架&#xff1a; 另一个想法&#xff1a;不依赖梯度&#xff0c;而是直接根据值函…

vue3+vite 实现动态引入某个文件夹下的组件 - glob-import的使用

<template><div class"user-content"><HeaderTitle title"用户详情"></HeaderTitle><div class"main-content"><div><UserForm /></div><div><TableList></TableList></d…

pytest实战技巧之参数化应用

pytest是Python中最流行的测试框架之一。它提供了丰富的功能&#xff0c;可以帮助我们编写高效、可靠的测试用例。其中一个重要的功能就是参数化&#xff0c;它可以让我们用不同的数据组合来运行同一个测试用例&#xff0c;从而 提高测试覆盖率和效率。本文将介绍pytest参数化的…

python之名称空间和作用域(关键字:global和nonlocal的使用)

文章目录 前言1、名称空间和作用域1.1 引言1.2 名称空间1.2.1 内置名称空间1.2.2 全局名称空间1.2.3 局部名称空间1.2.4 名称空间的产生和销毁顺序 1.3 作用域1.3.1 全局作用域1.3.2 局部作用域1.3.3 名字的查找顺序 1.4 关键字&#xff1a;global1.5 关键字&#xff1a;nonloc…

在eclipse中导入本地的jar包配置Junit环境步骤(包含Junit中的方法一直标红的解决方法)

搭建JUnit环境 下文中我用到的本地jar包可以到我上传的资源中下载&#xff0c;不需要积分 链接&#xff1a;https://download.csdn.net/download/weixin_70987470/89571891?spm1001.2014.3001.5503 一、配置环境 跟上一篇的那种方法不一样&#xff0c;直接Add to Build Path …

MySQL的表,视图,索引创建

一。创建表 1。创建Student表 mysql> create table Student(Sno int primary key auto_increment,Sname varchar(30) not null unique,Ssex varchar(2) check (Ssex 男 or Ssex 女) not null,Sage int not null,Sdept varchar(10) default 计算机 not null); 2.创建Cour…

Linux云计算 |【第一阶段】SERVICES-DAY2

主要内容&#xff1a; DNS服务基础及搭建、特殊解析(针对地址库文件&#xff1a;DNS轮询 DNS泛域名解析 DNS别名&#xff09;、缓存DNS&#xff08;全局转发forwarders&#xff09;、DNS递归迭代&#xff08;子域授权&#xff09;、DNS主从架构搭建、DNS主从数据同步 一、DNS工…

基于Element UI内置的Select下拉和Tree树形组件,组合封装的树状下拉选择器

目录 简述 效果 功能描述 代码实现 总结 简述 基于Element UI内置的Select下拉和Tree树形组件&#xff0c;组合封装的树状下拉选择器。 效果 先看效果&#xff1a; 下拉状态&#xff1a; 选择后状态&#xff1a; 选择的数据&#xff1a; 功能描述 1、加载树结构&…

Python 使用TCP\UDP协议创建一个聊天室

server端代码&#xff1a; #encodingutf-8 # 服务端代码 import socketdef server():server_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)host socket.gethostname()port 12345server_socket.bind((host, port))server_socket.listen(5)print(等待客户端连接…

如何通过一条SQL变更多个分库分表?

数据库发展到今天&#xff0c;分库分表已经不是什么新鲜话题了&#xff0c;传统的单节点数据库架构在数据量和访问频次达到一定规模时&#xff0c;会出现性能瓶颈和扩展性问题&#xff0c;而分库分表技术通过将数据分散到多个数据库实例中来分担负载&#xff0c;从而提升系统的…

粮信甄选·非凡凸现|携手中粮期货、国信证券共同见证数智交易前沿力量

近日&#xff0c;中粮期货、国信证券联合非凸科技在深圳举办了“粮信甄选&#xff0c;非凡凸现”主题机构洽谈会&#xff0c;与行业专家、私募管理人共同探讨如何推进产业与金融的深度融合&#xff0c;以及实现科技创新与生态合作的新模式。 近年来&#xff0c;国信证券始终聚…

开源物联网网关ThingsBoard IoT Gateway

前几天测试了Neuron&#xff0c;这是一个令人印象深刻的平台&#xff0c;不过它不能算是完全免费的平台&#xff0c;因为它还是有商业许可要求的&#xff0c;挺贵的&#xff0c;大几万的&#xff0c;而且它有走向闭源的趋势。所以也在寻找它的替代方案。 今天看到一个ThingsBo…

最新全新UI异次元荔枝V4.4自动发卡系统源码

简介&#xff1a; 最新全新UI异次元荔枝V4.4自动发卡系统源码 更新日志&#xff1a; 1增加主站货源系统 2支持分站自定义支付接口 3目前插件大部分免费 4UI页面全面更新 5分站可支持对接其他分站产品 6分站客服可自定义 7支持限定优惠 图片&#xff1a; 会员中心截图&…

王春城 | TPM是如何减少设备停机时间的?

在快节奏的生产环境中&#xff0c;设备停机时间无疑是每个企业都头疼的问题。它不仅影响生产效率&#xff0c;还可能造成巨大的经济损失。那么&#xff0c;有没有一种神奇的方法能够一键减少设备停机时间呢&#xff1f;答案就是--TPM&#xff08;全面生产维护&#xff09;&…

【区块链+绿色低碳】双碳数字化管控平台 | FISCO BCOS应用案例

地方政府、园区及企业实现“双碳”目标过程中存在一些挑战与难点&#xff1a; 1. 管理者难以掌握完整、准确、全面的碳排放数据进行科学决策&#xff1a;由于碳排放核算需要对数据的来源、核算方法 的规范性和采集方法的科学性有严格要求&#xff0c;当前面临碳排放数据数据采…

什么是PCB盲孔、埋孔和电镀孔?

PCB有不同类型的孔&#xff0c;根据孔贯穿PCB内外层的层次&#xff0c;孔可分为通孔、埋孔和盲孔。 如您所知&#xff0c; PCB 由堆叠在一起的铜箔层组成.这些“过孔”连接PCB上的不同电路层。它类似于具有相互连接的隧道的地下系统。如果你玩过电子游戏“超级马里奥”&#xf…

基于web的物流配送管理系统/基于客户时间窗变化的物流配送管理系统/快递配送管理系统

摘 要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&a…

springboot集成redis之字典缓存

什么是redis的字典缓存&#xff1f; Redis的缓存是Redis内部用于存储键值对数据结构的一种基础数据结构。在Redis中&#xff0c;所有的键值对都是通过字典这种数据结构来存储的。字典在Redis中扮演着核心角色&#xff0c;因为它不仅用于数据库中的键值对存储&#xff0c;还用于…

React 学习——条件渲染、遍历循环、事件绑定

React特点&#xff1a; 声明式的设计高效&#xff0c;采用虚拟DOM来实现DOM的渲染&#xff0c;最大限度减少DOM的操作灵活&#xff0c;跟其他库灵活搭配使用JSX&#xff0c;俗称JS里面写HTML&#xff0c;JavaScript语法的扩展组件化&#xff0c;模块化&#xff0c;代码容易复用…