Linux 生产者消费者模型

news2024/10/5 10:07:18

前言

生产者消费者模型(CP模型)是一种十分经典的设计,常常用于多执行流的并发问题中!很多书上都说他很高效,但高效体现在哪里并没有说明!本博客将详解!

目录

前言

一、生产者消费者模型

1.1 什么是生产者消费者模型?

1.2 生产者消费者模型的特点

1.3 生产者消费者模型的优点

二、基于阻塞队列实现生产者消费者模型

2.1 阻塞队列

2.2 单生产单消费模型

2.3 多生产多消费模型

三、POSIX 信号量

3.1 信号量的基本概念

3.2 信号量的相关操作

四、基于环形队列实现生产者消费者模型

4.1 环形队列

4.2 单生产单消费模型

4.3 多生产多消费模型

• 如何理解生产者消费者模型的效率高?


一、生产者消费者模型

1.1 什么是生产者消费者模型?

• 生产消费者模型(Producer-Consumer Model) 简称CP模型 是 多执行流 并发 的一个经典模型,主要是两个或者多个执行流(进程/线程)通过一个 容器  进行数据共享和通信的机制!

• 在该模型中:  生产者执行流 负责 向 "容器" 中生产数据; 消费者执行流 负责 从 "容器" 中消费数据;容器 一般是指 缓冲区 

什么生产者、消费者、容器?感觉很复杂~!虽然听起来很难,但实际上一点也不简单!哈哈~开个玩笑!其实生产者和消费者模型还是比较简单的,OK,我们下面举个例子理解一下:


我们就以现实中的超市举例子:

超市的工作模式:

顾客只需要到超市购买,供应商只需要向超市供应!即顾客和供应商之间不需要见面,这就做到了很好的 解耦 

超市工作模式的好处是,超市可以提前 缓存 大量的商品!正是超市的缓存机制,可以解决:

1、当顾客多,供应商少,即供货速度慢,消费速度快时,可以先让供应商提前向超市生产商品,再让你顾客进行消费!

2、当顾客少,供应商多,即供货速度快,消费速度慢时,可以先让顾客提前到超市消费,再让供应商生产商品!

这就做到了,允许生产消费的步调不一致,即协调生产者和消费者的 忙闲不均

在这个例子中,顾客就是消费者,供应商就是生产者,超市就是容器(提供交易的场所)!

当然,超市不可能只面向同一个顾客/供应商,而是被多个顾客和供应商同时看见的!也就是说,容器(交易场所)注定是被多执行流所看见的,即他是共享资源,在多线程环境中 ,共享资源被多执行流并发访问时是必须要保证安全的,如何保证? 同步和互斥


在现实中,市场(超市中的货架位置)是有限的,多个同一货品的供应商,为了抢占市场,都会加大促销来排挤对手,例如泡面:某师傅与统某大战

在竞争之下,势必有一家供应商失去市场,所以可以得出:生产者之间是典型的互斥关系


消费者之间是互斥关系

消费者之间,给人的感觉好像是同步关系!但是实际上他们也是互斥的,比如,你和情敌到超市都想买一个红箭的口香糖,但是只有一个了!此时你们只能拼手速抢喽~!


生产者和消费者是同步和互斥关系

比如说,有一天你想吃最喜欢的 脑残酸菜牛肉面 了, 去超市购买,结果没货了,你就失望的走了,回到宿舍刚躺下完了三分钟的手机,看到视屏里面的人在吃,你忍不了了,又跑到超市结果还没有!老板看见了你,说小伙子你加我微信吧,有货了我告诉你!于是你就回去等了,当超市供货商提供了在录入价格的期间,老板说小伙子过来买吧,现在有了!但是由于货物过多,你去的时候价格还没有录入完,老板让你等2分钟把价格录入后,再让你到货架拿 脑残酸菜牛肉面!此时体现的是互斥和同步,互斥是当没录入完价格时不允许你进入拿面,同步是没货时先让供货商供货,然后再让你购买,具有一定的顺序!


1.2 生产者消费者模型的特点

生产者消费者的模型特点,可以总结为321原则

3 种关系

• 生产者和生产者  -> 互斥

• 消费者和消费者  -> 互斥

• 生产者和消费者  -> 互斥&&同步

2 种角色

• 生产者

• 消费者

1 个交易场所

• 通常是一个特定的缓冲区(阻塞队列/环形队列)

注意 :这里的321原则并不是课本上提出的,而是我的恩师蛋哥总结的!


1.3 生产者消费者模型的优点

• 解耦

生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置

消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据

• 协调忙闲不均

• 当生产者速度慢,消费者速度快时,可以先让生产者先生产,再让消费者消费;

• 当生产者速度快,消费者速度慢时,可以先让消费者先消费,再让生产者生产;

• 并发效率高

这个后面实现了生产者消费者模型了解释!


二、基于阻塞队列实现生产者消费者模型

上面刚介绍了,生产者消费者模型中,要有一个交易场所,一般这个场所是 阻塞队列 或者 环形队列!两者的区别是,阻塞队列 是对这个场所整体性使用,而 环形队列 是对这个交易场所划分成多个小场所使用!我们先来介绍整体性使用的即阻塞队列!

2.1 阻塞队列

阻塞队列Blocking Queue)是一种常用于生产者消费者模型的数据结构;是一种特殊的队列,具备 先进先出 FIFO 的特性,与普通的队列不同的是 阻塞队列 大小固定的,也就是存在 容量的!阻塞队列 可以为空,也可以为满

• 阻塞队列入队 -> 生产者进行生产,阻塞队列出队 -> 消费者进行消费; 

• 阻塞队列为满时,进行对生产者阻塞;

• 阻塞队列为空时,进行对消费者阻塞;

是不是和 管道十分的相似!

• 当管道满了将写端阻塞,等读端读取了,即有空间了再让写端来写!

• 当管道为空,阻塞读端,当写端写了再让读端来读!

这也和我们当初介绍管道的特点之一:“管道内部自己维护了同步和互斥的机制”一致!

2.2 单生产单消费模型

我们先来实现一个最简单的,但生产单消费模型,首先搭建一个阻塞队列类框架

我们将阻塞队列放在BlockingQueue.hpp

首先,我们需要一个队列,可以和C语言一样手搓,但是今天STL中有现成的,所以队列就使用 std::queue 了,因为阻塞队列是有容量大小的,所以得使用一个整数记录容量!保障,队列满/空时的阻塞,所以得使用互斥锁和条件变量实现!

所以一个基本的框架如下:

#pragma once

#include <pthread.h>
#include <queue>

template <class T>
class BlockingQueue
{
private:
    // 判断阻塞队列是否为空
    bool IsEmpty()
    {
        // ...
    }

    // 判断阻塞队列是否为满
    bool IsFull()
    {
        // ...
    }
public:
    // 构造
    BlockingQueue(int cap = default_cap)
        : _max_cap(cap)
    {
        pthread_mutex_init(&_mutex);
        pthread_cond_init(&_cond);
    }

    // 析构
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 生产者 生产(入队)
    void Push(const T& in)
    {
        // ...
    }

    // 消费者 消费(出队)
    void Pop(T* out)
    {
        // ...
    }

private:
    std::queue<T> _block_queue;
    int _max_cap;           // 阻塞队列的容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _cond;   // 条件变量(存疑)
};

 OK,大框架有了之后,我们现在的问题就是,把上述的接口实现好即可!我们一个个来:

如何判断阻塞队列为空/为满?

• 判断为空:只需判断std::queue是否为空即可!

• 判断为满:只需判断std::queue的size是否 和 容量相等!

// 判断阻塞队列是否为空
bool IsEmpty()
{
    return _block_queue.empty();
}

// 判断阻塞队列是否为满
bool IsFull()
{
    return _max_cap == _block_queue.size();
}

如何实现 生产者生产数据 即入队?

因为阻塞队列是整体使用的,所以当生产者生产时,消费者就不能动阻塞队列!而他们都可以看到阻塞队列,即阻塞队列是临界资源!如何保障临界资源同一时刻被一个执行流访问?当然是加一把互斥锁了!然后当队列不为满时,入队;否则等待

如何保证等待?

使用条件变量!

但是现在的条件变量只有一个,而消费者和生产者都要等待,如果有一个条件变量的话,会使得编码很复杂,所以我们再加一个条件变量,让他们等到时在各自的条件下等待

// 生产者 生产(入队)
void Push(const T &in)
{
    // 加锁
    pthread_mutex_lock(&_mutex);
    // 判断是否为满
    if(IsFull())// if ?
    {
        // 在生产者的条件下等待
        pthread_cond_wait(&_p_cond, &_mutex);
    }
    // 1、不为满 || 2、重新竞争到锁了
    _block_queue.push(in);
    // 解锁
    pthread_mutex_unlock(&_mutex);
}

如何实现 消费者消费数据 即出队?

首先也是得加锁的,保证在消费者访问时,生产者不能打扰!其次当阻塞队列为空时,消费者应该在他的条件下等待!

// 消费者 消费(出队)
void Pop(T *out)
{
    // 加锁
    pthread_mutex_lock(&_mutex);
    // 判断是否为空
    if(IsEmpty()) // if ?
    {
        // 在消费者的条件下等待
        pthread_cond_wait(&_c_cond, &_mutex);
    }
    // 1、不为空 || 2、重新竞争到锁了
    *out = _block_queue.front();
    _block_queue.pop();
    // 解锁
    pthread_mutex_unlock(&_mutex);
}

现在有个尴尬的问题是:

如果生产者或消费者,因为原先的阻塞队列是满/空而等待时,对方虽然消费/生产了数据,即可以生产/消费了,而对方不知道!所以要想办法唤醒对方!如何唤醒呢?

知道是否可以消费的是生产者,因为当他执行完入队操作时,注定了队列中一定至少有一个元素,所以,让他唤醒消费者最合适,因为只有他可以确保队列中有数据

同理,知道是否可以生产的一定是消费者,以为当他拿走一个数据后,至少队列中有一个位置可以生产(入队),所以让他唤醒生产者~!

所以,当生产者或消费者,执行完生产/消费时,应该唤醒阻塞的对方继续操作

#pragma once

#include <pthread.h>
#include <queue>

template <class T>
class BlockingQueue
{
private:
    // 判断阻塞队列是否为空
    bool IsEmpty()
    {
        return _block_queue.empty();
    }

    // 判断阻塞队列是否为满
    bool IsFull()
    {
        return _max_cap == _block_queue.size();
    }

public:
    // 构造
    BlockingQueue(int cap = default_cap)
        : _max_cap(cap)
    {
        pthread_mutex_init(&_mutex);
        pthread_cond_init(&_c_cond);
        pthread_cond_init(&_p_cond);
    }

    // 析构
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_init(&_c_cond);
        pthread_cond_init(&_p_cond);
    }

    // 生产者 生产(入队)
    void Push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为满
        if (IsFull()) // if ?
        {
            // 在生产者的条件下等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 1、不为满 || 2、重新竞争到锁了
        _block_queue.push(in);
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的消费者
        pthread_cond_signal(&_c_cond);
    }

    // 消费者 消费(出队)
    void Pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为空
        if (IsEmpty()) // if ?
        {
            // 在消费者的条件下等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        // 1、不为空 || 2、重新竞争到锁了
        *out = _block_queue.front();
        _block_queue.pop();
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的生产者
        pthread_cond_signal(&_p_cond);
    }

private:
    std::queue<T> _block_queue;
    int _max_cap;           // 阻塞队列的容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _c_cond; // 消费者条件变量
    pthread_cond_t _p_cond; // 生产者条件变量
};

单生产但单费阻塞队列,这样就封装好了!我们下面来实现一下上层的调用操作:

上层调用的代码写在test.cc中!生产者去向队列中写入整数(1~10的随机数),消费者从队列中拿出来打印~!

#include "BlockingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>

void* Consumer(void*args)
{
    BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(args);
    while (true)
    {
        // 从阻塞队列获取数据
        int data = 0;
        bq->Pop(&data);
        // 处理数据    
        std::cout << "Consumer -> " << data << std::endl;
    }

    return nullptr;
}

void* Producer(void*args)
{
    BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(args);
    srand(time(nullptr) ^ getpid());
    while (true)
    {
        // 生产数据
        int data = rand() % 10 + 1;
        bq->Push(data);
        // 处理数据  
        std::cout << "Producer -> " << data << std::endl;   
    }

    return nullptr;
}

int main()
{
    // 创建一个阻塞队列
    BlockingQueue<int> *bq = new BlockingQueue<int>();
    // 创建两个线程
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);// 消费者
    pthread_create(&p, nullptr, Producer, bq);// 生产者

    // 等待线程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;
    return 0;
}

我们先来直接运行:

此时生产者疯狂生产,消费者疯狂消费!也就是在两者疯狂的打印,不容易看到阻塞队列的特点,为了验证阻塞队列的特点,我们采用休眠的方式,验证:

1、消费者每一秒消费一次,生产者疯狂的生产

预期现象生产者一次性把队列生产满,然后每个一秒消费者打印一次,生产者生产一次

符合预期!

2、生产者每个一秒生产一个,消费者一直消费

预期现象由于一开始生产者休眠,所以消费者阻塞,后面生产者隔一秒生产一个,同时唤醒消费者消费一个,所以就是隔一秒打印一个

OK,符合预期~!


• 一些细节问题补充

虽然上面的单生产但消费的代码,已经可以跑起来了,但是里面还存在一些细节问题,下面我们来进行优化一下:

问题一:在阻塞队列中的Push/Pop中直接使用 if 判断条件是否满足,可能会出现问题!

理由如下:

1、pthread_cond_wait 函数可能调用失效,会造成误唤醒/伪唤醒的情况,如果此时是 if 则会继续向下走,可能导致非法的进行生产/消费

2、如果是多线程情况下,只生产一个,而唤醒所有(伪唤醒),此时是 if,虽然只有一个持有锁的线程才可以访问,但是其他线程此时条件以满足,只是在锁的位置等待,此时会造成多个线程非法的进入临界区,造成资源的损坏

如何解决?只需if 换成 while 即可!这样即使你造成了伪唤醒,在往下执行前会先检查!直到条件在往下继续执行:

    // 生产者 生产(入队)
    void Push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为满
        while (IsFull()) // if ?
        {
            // 在生产者的条件下等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 1、不为满 || 2、重新竞争到锁了
        _block_queue.push(in);
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的消费者
        pthread_cond_signal(&_c_cond);
    }

    // 消费者 消费(出队)
    void Pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为空
        while (IsEmpty()) // if ?
        {
            // 在消费者的条件下等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        // 1、不为空 || 2、重新竞争到锁了
        *out = _block_queue.front();
        _block_queue.pop();
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的生产者
        pthread_cond_signal(&_p_cond);
    }

问题二:在Push/Pop后,需要唤醒对方来执行!唤醒在解锁前后有影响吗?

答案是:没有影响!唤醒对方,在解锁前后都可以!原因是:

1、如果在解锁前唤醒对方,对方没有锁,他会在锁那个位置等待对方解锁!

2、如果在解锁后唤醒对方,对方没有条件变量,在条件变量那里等对方唤醒!

所以在解锁前后唤醒对方是没有影响的!

问题三:阻塞队列的任务中只能放 int 这样的整数吗?

当然不是!我们写的是模板呀!数据类型是T,T可以是int这样的整数,当然也可以是自定义类的对象喽!

我们这里搞一个任务类,Task.hpp 让它实现加法

#pragma once

#include <iostream>

class Task
{
public:
    Task(int x, int y)
        :_x(x),_y(y)
    {}

    Task(){}

    std::string debug()
    {
        return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
    }

    void Excute()
    {
        _result = _x + _y;
    }

    void operator()()
    {
        Excute();
    }

    std::string result()
    {
        std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
        return msg;
    }
    
private:
    int _x;
    int _y;
    int _result;
};

此时我们可以:让生产者给两个数,让你消费者计算:

看效果:

格局打开,这里只是放了一个简单计算的任务,我们实际还可以放入更复杂的任务!

比如 网络请求、SQL查询、并行 IO尤其是 IO,使用 「生产者消费者模型」 可以大大提高效率,包括后面的 多路转接,也可以接入 「生产者消费者模型」 来提高效率!


2.3 多生产多消费模型

基于上面的介绍,我们可以实现多生产多消费模型了!其实,经过上面的修改,我们不需要修改上面的代码直接可以适应多生产多消费的场景

OK,我们先来实验一下:

int main()
{
    // 创建一个阻塞队列
    BlockingQueue<Task> *bq = new BlockingQueue<Task>();
    // 创建两个线程
    pthread_t c1, c2, p1, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq); // 消费者
    pthread_create(&c2, nullptr, Consumer, bq); // 消费者
    pthread_create(&p1, nullptr, Producer, bq); // 生产者
    pthread_create(&p2, nullptr, Producer, bq); // 生产者
    pthread_create(&p3, nullptr, Producer, bq); // 生产者

    // 等待线程
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    delete bq;
    return 0;
}

当然这可能会造成屏幕的打印错乱问题,这是因为显示器本质也是文件不同线程向同一个文件写入显示器不就是临界资源吗?所以理论上也要对显示器作保护的!


• 为什么当前单生产单消费的代码不直接修改就可以适用于 多生产多消费场景 呢?

原因很简单:

生产者消费者、都是对同一个阻塞队列做操作,而阻塞队列是整体使用,即每次只允许一个持有锁的线程访问,所以即使多线程过来,也是得先竞争锁资源的!而互斥锁保证了他们是串行的~!

OK,以上就是基于阻塞队列实现的生产者消费者模型了!下面我们来介绍信号量和用信号量基于循环队列的生产者消费者模型!


三、POSIX 信号量

3.1 信号量的基本概念

互斥和同步 不是只能由 互斥锁和条件变量 实现,还能通过 信号量 sem互斥锁实现(出自POSIX标准)

信号量 的本质是一个 计数器描述临界资源中资源数目的计数器

• 申请到资源,计数器 -- (P操作)

• 释放完资源,计数器 ++(V操作)

信号量是描述临界资源数目的,但他也是被所有的线程所共享,即信号量是临界资源,所以对信号量的 PV 操作必须是原子的

如果我们把「生产者消费者模型」中某一临界资源 整体性使用,那他的信号量的值就是 1

• sem 值为 1,表示线程可以生产/消费,执行sem--;

• sem 值为 0,表示线程不能生产/消费,只能阻塞等待;

此时的信号量只有两态,即 1 / 0 ,可以实现互斥锁的效果,即实现线程互斥!像这种只有两态的信号量被称为 二元信号量/二进制信号量

如果我们把「生产者消费者模型」中某一临界资源 分成N份使,那信号量的值就是N:

• 当 sem == N  时,阻塞队列已经空了,消费者无法消费

• 当 sem == 0  时,阻塞队列已经满了,生产者无法生产

• 当 sem != 0 && sem != N 时,申请资源就绪,sem--;资源释放,sem++;

像这种信号量的值被初始化为N,的信号量被称为多元信号量/计数信号量

当一个线程想要访问临界资源时,就必须要申请信号量,当申请成功,继续执行;否则,就阻塞等待,直到有信号量资源可用!如此一来就可以和 条件变量 一样实现 同步 了!

其实 「信号量」 的工作机制类似于 买电影票,是一种 预订机制,只要你买到票了,即使你晚点到达电影院,你的位置也始终可用,买到票的本质是将对应的座位进行了预订!详见:Linux IPC-System V


3.2 信号量的相关操作

有了之前 互斥锁、条件变量 的使用基础,信号量 的接口学习是非常简单的,依旧是只有四个接口:初始化、销毁、申请、释放

初始化信号量

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数

• sem           表示需要初始化的信号量的指针

• pshared    表示当前信号的共享状态,0表示线程间共享,1表示进程间共享

• value         表示信号量的初始值,可以设置多元/二元

返回值

成功返回 0,失败返回 -1,并设置错误码

注意:这一类函数的返回值都是一样的,后面不在介绍

销毁信号量

#include <semaphore.h>

int sem_destroy(sem_t *sem);

参数

待销毁的信号量指针

申请信号量(P操作--)

#include <semaphore.h>

int sem_wait(sem_t *sem);

int sem_trywait(sem_t *sem);

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

主要使用 :int sem_wait(sem_t *sem);

参数

第一个:sem  表示从哪一个信号量中申请

第二个尝试申请,如果没有申请到资源,就会返回;

第三个:尝试申请,如果没有申请到资源,就会返回,每隔一段时间进行申请,即 timeout

释放信号量(V操作++)

#include <semaphore.h>

int sem_post(sem_t *sem);

参数

将资源释放到哪个信号量中

这批接口属于是看一眼就会用,再多看一眼就会爆炸!所以我们直接来基于上述的接口实现生产者消费者模型!


四、基于环形队列实现生产者消费者模型

4.1 环形队列

生产者消费者模型 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列 !

关于环形队列,这里不在多哔哔,我在数据结构的时候手撕过:DS线性表之栈和队列

数组实现的环形队列,麻烦的就是如何判断空和满?判断的方式有两种,以前我在博客中提到了第一种:

1、多开一个空间,当tail + 1 == head 时,表示满;当tail == head 的时候,表示空;

2、搞一个计数器,当计数器的值为 0 时,表示当前为空,当计数器的值为容量时,表示队列为满

这两种策略都可以确保 环形队列 正确判空和判满,至于这里肯定是选择策略二,因为 「信号量」 本身就是一个天然的计数器

环形队列 中,生产者消费者 关心的资源不一样:生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据;所以可以搞两个信号量分别标识生产者和消费者的资源数!

• 生产者信号量:标识当前有多少可用空间

• 消费者信号量:标识当前有多少数据

• 生产者的初始值为,环形队列的大小,消费者的初始值为0;

• 无论是生产者还是消费者,只有申请到自己的 「信号量」 资源后,才进行 生产 / 消费、

OK,有了上述的理解,就可以去实现了,我们还是先单生产但消费,然后多生产多消费!

4.2 单生产单消费模型

我们定义一个数组作为循环队列的底层缓冲区,定义两个信号量分别是空间和数据,定义生产者和消费者的下标!为了操作和理解,我们将sem_wait和sem_post封装成P和V:

#pragma once

#include <pthread.h>
#include <semaphore.h>
#include <vector>

const static int default_cap = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t& s)
    {
        sem_wait(&s);
    }

    void V(sem_t& s)
    {
        sem_post(&s);
    }

public:
    RingQueue(int cap = default_cap)
        : _ring_queue(cap),_max_cap(cap),_c_step(0),_p_step(0)
    {
        sem_init(&_space, 0, _max_cap);
        sem_init(&_data, 0, 0);
    }

    ~RingQueue()   
    {
        sem_destroy(&_space);
        sem_destroy(&_data);
    }

    void Push(const T& in)
    {
        // 申请信号量
        P(_space);
        // 生产
        _ring_queue[_p_step] = in;
        _p_step++;
        _p_step %= _max_cap;
        // 释放信号量
        V(_data);
    }

    void Pop(T* out)
    {
        // 申请信号量
        P(_data);
        // 消费
        *out = _ring_queue[_c_step];
        _c_step++;
        _c_step %= _max_cap;
        // 释放信号量
        V(_space);
    }

private:
    std::vector<T> _ring_queue;
    int _max_cap;//容量

    sem_t _space;// 空间信号量
    sem_t _data;// 数据信号量

    int _c_step;// 消费者下标
    int _p_step;// 生产者下标
};

我们还是先用数字测试:

#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        // 获取数据
        int data = 0;
        rq->Pop(&data);
        // 处理数据
        std::cout << "Consumer -> " << data << std::endl;
        sleep(1);
    }

    return 0;
}

void *Producer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        // 生产数据
        int data = rand() % 10 + 1;
        rq->Push(data);
        // 处理数据
        std::cout << "Producer -> " << data << std::endl;
        sleep(1);
    }

    return 0;
}

int main()
{
    srand(time(nullptr));
    RingQueue<int> *rq = new RingQueue<int>();
    
    pthread_t c,p;
    pthread_create(&c, nullptr, Producer,rq);
    pthread_create(&p, nullptr, Consumer,rq);

    // 等待线程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

为了避免刷屏的效果,我们先各一秒,然后生产和消费:

让消费者一秒读取一次;预期现象:生产者先生成满,然后一秒过后,消费一个,生产一个:

当然,让生产者一秒生产一个,消费者不休眠,现象就是生成一个,消费一个:

细节问题:在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?

互斥:虽然这里没有直接使用互斥锁,但信号量的操作本身是原子的,这意味着在任何时刻,只有一个线程(生产者或消费者)可以修改信号量的值。因此,通过信号量的机制,间接实现了 生产者和消费者 对环形队列访问的 互斥

同步:信号量的操作确保了生产者和消费者之间的同步。当队列满时,生产者必须阻塞等待消费者消费;当队列空时,消费者必须阻塞等待生产者生产。这种等待和唤醒的PV操作就是同步的体现!

当然我们可不止只会生产和消费整数,因为是模板所以,可以是任意的类型,我们呢可以把上面的阻塞队列的Task.hpp的任务拿过来直接测试:

#pragma once

#include <iostream>

class Task
{
public:
    Task(int x, int y)
        :_x(x),_y(y)
    {}

    Task(){}

    std::string debug()
    {
        return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
    }

    void Excute()
    {
        _result = _x + _y;
    }

    void operator()()
    {
        Excute();
    }

    std::string result()
    {
        std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
        return msg;
    }
    
private:
    int _x;
    int _y;
    int _result;
};

这里的运行结果与 阻塞队列 那边的一模一样,证明当前的 「生产者消费者模型」 没有问题(单生产单消费场景中)

注:如果想要提高并发度,可以增大环形队列的容量


4.3 多生产多消费模型

信号量的 PV 操作,保证的是 生产者和消费者 在任意时刻访问 循环队列 时只有一个线程可操作!也就是他保证的是 生产者和消费者之间 的互斥

但是现在是,多执行流即多生产多消费,他们申请到信号量进行 生产/消费 时可能会出现问题,原因是:生产/消费的下标各是一个,也就是会造成对于同一资源的破坏如何解决 生产和生产/消费和消费 之间的 互斥关系 呢?互斥锁

现在的问题是加几把锁?

答案是:两把!因为生产者和消费者关注的资源是不一样!

阻塞队列 中为什么只需要一把锁?
因为阻塞队列中的共享资源是一整个队列,生产者和消费者访问的是同一份资源,所以一把锁就够了

#pragma once

#include <pthread.h>
#include <semaphore.h>
#include <vector>

const static int default_cap = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }

    void V(sem_t &s)
    {
        sem_post(&s);
    }

    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    
    void UnLock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }

public:
    RingQueue(int cap = default_cap)
        : _ring_queue(cap), _max_cap(cap), _c_step(0), _p_step(0)
    {
        sem_init(&_space, 0, _max_cap);
        sem_init(&_data, 0, 0);

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }

    ~RingQueue()
    {
        sem_destroy(&_space);
        sem_destroy(&_data);
        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

    void Push(const T &in)
    {
        // 申请信号量
        P(_space);
        // 加锁
        Lock(_p_mutex);
        // 生产
        _ring_queue[_p_step] = in;
        _p_step++;
        _p_step %= _max_cap;
        // 解锁
        UnLock(_p_mutex);
        // 释放信号量
        V(_data);
    }

    void Pop(T *out)
    {
        // 申请信号量
        P(_data);
        // 加锁
        Lock(_c_mutex);
        // 消费
        *out = _ring_queue[_c_step];
        _c_step++;
        _c_step %= _max_cap;
        // 解锁
        UnLock(_c_mutex);
        // 释放信号量
        V(_space);
    }

private:
    std::vector<T> _ring_queue;
    int _max_cap; // 容量

    sem_t _space; // 空间信号量
    sem_t _data;  // 数据信号量

    int _c_step; // 消费者下标
    int _p_step; // 生产者下标

    pthread_mutex_t _c_mutex; // 消费者互斥锁
    pthread_mutex_t _p_mutex; // 生产者互斥锁
};

让消费者先休眠一秒,然后现象应该是 :先生成满,然后一秒消费一个,生产一个:

细节1: 在信号量申请成功之后 加锁,可以提高并发度

上述的,为了防止 生产和生产/消费和消费 对同一资源破坏,需要加互斥锁让他们串行!加锁的位置有两种:1、在申请信号量前加锁 2、在申请信号量以后加锁 这两种都是可以的!但是后者更优!原因如下:

这就好比,你去看电影,你是到时候进放映厅的时候再买票,还是先买票到时候直接进去?

当然是后者喽!原因是信号量的PV本身就是原子的,所以不会出错!所以可以提前申请好信号到时候竞争互斥锁串行访问,即可

细节2:为什么在申请信号量的时候,不需要判断一下条件是否满足?

信号量本质就是一个资源数目的计数器!是一种资源的预定机制!

预定就体现在:可以不判断是否满足,就可以知道内部资源的情况!申请信号量本身就是在条件判断


• 如何理解生产者消费者模型的效率高?

阻塞队列 中,每一个线程执行操作都必须先得加锁,也就是串行操作!循环队列 中,多线程即使并发申请到了信号量,最后也是得申请锁,串行执行操作的!这好像也没多高效吧!这样看确实!但是上面刚说了,我们生产者和消费者不只是处理的是 int 这样的整数,而是大多可能执行的是 网络请求、SQL查询、并行 IO 等,而请求和处理这些操作 本身是很费时间的!

当一个线程在 请求完正在处理这个任务 的同时(花费时间),其他线程去请求,这样不就大大的提高了 并发度 吗?不就是提高了,效率吗!而在这种比较费时间的操作下,加锁和解锁的时间也是可以忽略的!即书上所说的,生产者消费者模型的高效是体现在这里


OK,好兄弟这就是CP模型的所有内容了,我是 cp 我们下期见~!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2189695.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

绝美的登录界面!滑动切换效果

绝美登录界面&#xff01;添加了管理员账号和测试账号 <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><scri…

RC正弦波振荡电路

0、判断电路能否产生正弦波震荡的条件 如上图所示&#xff0c; Xo:输出量&#xff1b; A:放大器的增益&#xff1b; F:反馈系数。 上式分别为RC正弦波震荡器的幅值条件和相位条件&#xff0c;为了使输出量在合闸后能够有一个从小到大直至平衡在一定幅值的过程&#xff0c;电…

《Linux服务与安全管理》| 配置YUM源并验证

《Linux服务与安全管理》配置YUM源并验证 目录 《Linux服务与安全管理》配置YUM源并验证 任务一&#xff1a;配置本地YUM源 任务二&#xff1a;配置网络YUM源 学生姓名 **** 学号 **** 专业 **** 任务名称 配置YUM源并验证 完成日期 **** 任务目标 知识 了解配…

docker安装kafka-manager

kafkamanager docker安装_mob64ca12d80f3a的技术博客_51CTO博客 # 1、拉取镜像及创建容器 docker pull hlebalbau/kafka-manager docker run -d --name kafka-manager -p 9000:9000 --networkhost hlebalbau/kafka-manager# 2、增设端口 腾讯云# 3、修改防火墙 sudo firewall-…

Salesforce AI 推全新大语言模型评估家族SFR-Judge 基于Llama3构建

在自然语言处理领域&#xff0c;大型语言模型&#xff08;LLMs&#xff09;的发展迅速&#xff0c;已经在多个领域取得了显著的进展。不过&#xff0c;随着模型的复杂性增加&#xff0c;如何准确评估它们的输出就变得至关重要。传统上&#xff0c;我们依赖人类来进行评估&#…

【目标检测】yolo的三种数据集格式

目标检测中数据集格式之间的相互转换--coco、voc、yolohttps://zhuanlan.zhihu.com/p/461488682?utm_mediumsocial&utm_psn1825483604463071232&utm_sourcewechat_session【目标检测】yolo的三种数据集格式https://zhuanlan.zhihu.com/p/525950939?utm_mediumsocial&…

Python小示例——质地不均匀的硬币概率统计

在概率论和统计学中&#xff0c;随机事件的行为可以通过大量实验来研究。在日常生活中&#xff0c;我们经常用硬币进行抽样&#xff0c;比如抛硬币来决定某个结果。然而&#xff0c;当我们处理的是“质地不均匀”的硬币时&#xff0c;事情就变得复杂了。质地不均匀的硬币意味着…

【宽搜】4. leetcode 103 二叉树的锯齿形层序遍历

1 题目描述 题目链接&#xff1a;二叉树的锯齿形层序遍历 2 题目解析 根据题目描述&#xff0c;第一行是从左往右遍历&#xff0c;第二行是从右往左遍历。和层序遍历的区别就是&#xff1a; 在偶数行需要从右往左遍历。 因此&#xff0c;只需要在层序遍历的基础上增加一个变…

网络基础:TCP/IP五层模型、数据在局域网传输和跨网络传输的基本流程、IP地址与MAC地址的简单解析

目录 背景介绍 网络协议 OSI七层模型 TCP/IP五层模型 TCP/IP协议与OS的关系 网络协议的本质 数据在局域网传输的基本流程 MAC地址 报文的封装和解包 补充内容 数据的跨网络传输基本流程 IP地址 IP地址和MAC地址的区别 ​​​ 背景介绍 网络的发展经理了四个阶段…

dijstra算法——单元最短路径算法

Dijkstra算法 用来计算从一个点到其他所有点的最短路径的算法&#xff0c;是一种单源最短路径算法。也就是说&#xff0c;只能计算起点只有一个的情况。Dijkstra的时间复杂度是O(n^2)&#xff0c;它不能处理存在负边权的情况。 算法描述&#xff1a; 设起点为s&#xff0c;d…

云原生(四十六) | MySQL软件安装部署

文章目录 MySQL软件安装部署 一、MySQL软件部署步骤 二、安装MySQL MySQL软件安装部署 一、MySQL软件部署步骤 第一步&#xff1a;删除系统自带的mariadb 第二步&#xff1a;下载MySQL源&#xff0c;安装MySQL软件 第三步&#xff1a;启动MySQL&#xff0c;获取默认密码…

【无标题】提升快递管理效率的必备技能:教你批量查询与导出物流信息

在当今快节奏的商业环境中&#xff0c;快递与物流行业的效率直接关系到企业的运营成本和客户满意度。随着订单量的不断增加&#xff0c;如何高效地管理和追踪大量的物流信息成为了企业面临的一大挑战。批量查询与导出物流信息作为一种高效的数据处理手段&#xff0c;正逐渐成为…

信息安全工程师(33)访问控制概述

前言 访问控制是信息安全领域中至关重要的一个环节&#xff0c;它提供了一套方法&#xff0c;旨在限制用户对某些信息项或资源的访问权限&#xff0c;从而保护系统和数据的安全。 一、定义与目的 定义&#xff1a;访问控制是给出一套方法&#xff0c;将系统中的所有功能和数据…

ElliQ 老年身边的陪伴

前记 国庆回家发现爸爸之前干活脚崴了&#xff0c;找个临时拐杖撑住&#xff0c;我心里很不是滋味。虽然总和爸妈说&#xff0c;不要干重活&#xff0c;但老人总是担心成为儿女的负担&#xff0c;所以只要能动&#xff0c;就找活干。 给爸妈一点零花钱&#xff0c;老妈只收了…

多系统萎缩患者的运动指南【健康守护,动出希望】

亲爱的朋友们&#xff0c;今天我们来聊聊一个特别而重要的话题——多系统萎缩患者的运动指南。面对这一挑战&#xff0c;适量的运动不仅能缓解病情&#xff0c;还能提升生活质量。让我们一起&#xff0c;用爱与坚持&#xff0c;为生命加油&#xff01; &#x1f308; ‌为什么…

Linux系统字符命令关机方法对比

一、相同点&#xff1a;都可以达到关机或重启系统的目的。 二、不同点&#xff1a;命令内部的工作过程不同。 1、shutdown 安全的关机命令&#xff1a;系统管理员会通知所有登录的用户系统将要关闭且 login 指令会被冻结&#xff0c;即新的用户不能再登录。根据使用的参数不同…

Spring Boot RESTful API开发教程

一、RESTful API简介 RESTful API是一种基于HTTP协议的Web API&#xff0c;其设计原则是简单、可扩展、轻量级、可缓存、可靠、可读性强。RESTful API通常使用HTTP请求方法&#xff08;GET、POST、PUT、DELETE等&#xff09;来操作资源&#xff0c;使用HTTP状态码来表示操作结…

SysML案例-电磁轨道炮

DDD领域驱动设计批评文集>> 《软件方法》强化自测题集>> 《软件方法》各章合集>> 图片示例摘自intercax.com&#xff0c;作者是Intercax公司总裁Dirk Zwemer博士。

【需求分析】软件系统需求设计报告,需求分析报告,需求总结报告(原件PPT)

第1章 序言 第2章 引言 2.1 项目概述 2.1.1 项目背景 2.1.2 项目目标 2.2 编写目的 2.3 文档约定 2.4 预期读者及阅读建议 第3章 技术要求 3.1 软件开发要求 3.1.1 接口要求 3.1.2 系统专有技术 3.1.3 查询功能 3.1.4 数据安全 3.1.5 可靠性要求 3.1.6 稳定性要求 3.1.7 安全性…

车载入行:HIL测试、功能安全测试、CAN一致性测试、UDS测试、ECU测试、OTA测试、TBOX测试、导航测试、车控测试

FOTA模块中OTA的知识点&#xff1a;1.测试过程中发现哪几类问题&#xff1f; 可能就是一个单键的ecu&#xff0c;比如升了一个门的ecu&#xff0c;他的升了之后就关不上&#xff0c;还有就是升级组合ecu的时候&#xff0c;c屏上不显示进度条。 2.在做ota测试的过程中&#xff…