前言
生产者消费者模型(CP模型)是一种十分经典的设计,常常用于多执行流的并发问题中!很多书上都说他很高效,但高效体现在哪里并没有说明!本博客将详解!
目录
前言
一、生产者消费者模型
1.1 什么是生产者消费者模型?
1.2 生产者消费者模型的特点
1.3 生产者消费者模型的优点
二、基于阻塞队列实现生产者消费者模型
2.1 阻塞队列
2.2 单生产单消费模型
2.3 多生产多消费模型
三、POSIX 信号量
3.1 信号量的基本概念
3.2 信号量的相关操作
四、基于环形队列实现生产者消费者模型
4.1 环形队列
4.2 单生产单消费模型
4.3 多生产多消费模型
• 如何理解生产者消费者模型的效率高?
一、生产者消费者模型
1.1 什么是生产者消费者模型?
• 生产消费者模型(Producer-Consumer Model) 简称CP模型 是 多执行流 并发 的一个经典模型,主要是两个或者多个执行流(进程/线程)通过一个 容器 进行数据共享和通信的机制!
• 在该模型中: 生产者执行流 负责 向 "容器" 中生产数据; 消费者执行流 负责 从 "容器" 中消费数据;容器 一般是指 缓冲区
什么生产者、消费者、容器?感觉很复杂~!虽然听起来很难,但实际上一点也不简单!哈哈~开个玩笑!其实生产者和消费者模型还是比较简单的,OK,我们下面举个例子理解一下:
我们就以现实中的超市举例子:
超市的工作模式:
顾客只需要到超市购买,供应商只需要向超市供应!即顾客和供应商之间不需要见面,这就做到了很好的 解耦 !
超市工作模式的好处是,超市可以提前 缓存 大量的商品!正是超市的缓存机制,可以解决:
1、当顾客多,供应商少,即供货速度慢,消费速度快时,可以先让供应商提前向超市生产商品,再让你顾客进行消费!
2、当顾客少,供应商多,即供货速度快,消费速度慢时,可以先让顾客提前到超市消费,再让供应商生产商品!
这就做到了,允许生产消费的步调不一致,即协调生产者和消费者的 忙闲不均 !
在这个例子中,顾客就是消费者,供应商就是生产者,超市就是容器(提供交易的场所)!
当然,超市不可能只面向同一个顾客/供应商,而是被多个顾客和供应商同时看见的!也就是说,容器(交易场所)注定是被多执行流所看见的,即他是共享资源,在多线程环境中 ,共享资源被多执行流并发访问时是必须要保证安全的,如何保证? 同步和互斥!
在现实中,市场(超市中的货架位置)是有限的,多个同一货品的供应商,为了抢占市场,都会加大促销来排挤对手,例如泡面:某师傅与统某大战:
在竞争之下,势必有一家供应商失去市场,所以可以得出:生产者之间是典型的互斥关系!
消费者之间是互斥关系
消费者之间,给人的感觉好像是同步关系!但是实际上他们也是互斥的,比如,你和情敌到超市都想买一个红箭的口香糖,但是只有一个了!此时你们只能拼手速抢喽~!
生产者和消费者是同步和互斥关系!
比如说,有一天你想吃最喜欢的 脑残酸菜牛肉面 了, 去超市购买,结果没货了,你就失望的走了,回到宿舍刚躺下完了三分钟的手机,看到视屏里面的人在吃,你忍不了了,又跑到超市结果还没有!老板看见了你,说小伙子你加我微信吧,有货了我告诉你!于是你就回去等了,当超市供货商提供了在录入价格的期间,老板说小伙子过来买吧,现在有了!但是由于货物过多,你去的时候价格还没有录入完,老板让你等2分钟把价格录入后,再让你到货架拿 脑残酸菜牛肉面!此时体现的是互斥和同步,互斥是当没录入完价格时不允许你进入拿面,同步是没货时先让供货商供货,然后再让你购买,具有一定的顺序!
1.2 生产者消费者模型的特点
生产者消费者的模型特点,可以总结为321原则
3 种关系
• 生产者和生产者 -> 互斥
• 消费者和消费者 -> 互斥
• 生产者和消费者 -> 互斥&&同步
2 种角色
• 生产者
• 消费者
1 个交易场所
• 通常是一个特定的缓冲区(阻塞队列/环形队列)
注意 :这里的321原则并不是课本上提出的,而是我的恩师蛋哥总结的!
1.3 生产者消费者模型的优点
• 解耦
• 生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置
• 消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据
• 协调忙闲不均
• 当生产者速度慢,消费者速度快时,可以先让生产者先生产,再让消费者消费;
• 当生产者速度快,消费者速度慢时,可以先让消费者先消费,再让生产者生产;
• 并发效率高
这个后面实现了生产者消费者模型了解释!
二、基于阻塞队列实现生产者消费者模型
上面刚介绍了,生产者消费者模型中,要有一个交易场所,一般这个场所是 阻塞队列 或者 环形队列!两者的区别是,阻塞队列 是对这个场所整体性使用,而 环形队列 是对这个交易场所划分成多个小场所使用!我们先来介绍整体性使用的即阻塞队列!
2.1 阻塞队列
阻塞队列(Blocking Queue)是一种常用于生产者消费者模型的数据结构;是一种特殊的队列,具备 先进先出 FIFO 的特性,与普通的队列不同的是 阻塞队列 是大小固定的,也就是存在 容量的!阻塞队列 可以为空,也可以为满!
• 阻塞队列入队 -> 生产者进行生产,阻塞队列出队 -> 消费者进行消费;
• 阻塞队列为满时,进行对生产者阻塞;
• 阻塞队列为空时,进行对消费者阻塞;
是不是和 管道十分的相似!
• 当管道满了将写端阻塞,等读端读取了,即有空间了再让写端来写!
• 当管道为空,阻塞读端,当写端写了再让读端来读!
这也和我们当初介绍管道的特点之一:“管道内部自己维护了同步和互斥的机制”一致!
2.2 单生产单消费模型
我们先来实现一个最简单的,但生产单消费模型,首先搭建一个阻塞队列类框架!
我们将阻塞队列放在BlockingQueue.hpp中
首先,我们需要一个队列,可以和C语言一样手搓,但是今天STL中有现成的,所以队列就使用 std::queue 了,因为阻塞队列是有容量大小的,所以得使用一个整数记录容量!保障,队列满/空时的阻塞,所以得使用互斥锁和条件变量实现!
所以一个基本的框架如下:
#pragma once
#include <pthread.h>
#include <queue>
template <class T>
class BlockingQueue
{
private:
// 判断阻塞队列是否为空
bool IsEmpty()
{
// ...
}
// 判断阻塞队列是否为满
bool IsFull()
{
// ...
}
public:
// 构造
BlockingQueue(int cap = default_cap)
: _max_cap(cap)
{
pthread_mutex_init(&_mutex);
pthread_cond_init(&_cond);
}
// 析构
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
// 生产者 生产(入队)
void Push(const T& in)
{
// ...
}
// 消费者 消费(出队)
void Pop(T* out)
{
// ...
}
private:
std::queue<T> _block_queue;
int _max_cap; // 阻塞队列的容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _cond; // 条件变量(存疑)
};
OK,大框架有了之后,我们现在的问题就是,把上述的接口实现好即可!我们一个个来:
如何判断阻塞队列为空/为满?
• 判断为空:只需判断std::queue是否为空即可!
• 判断为满:只需判断std::queue的size是否 和 容量相等!
// 判断阻塞队列是否为空
bool IsEmpty()
{
return _block_queue.empty();
}
// 判断阻塞队列是否为满
bool IsFull()
{
return _max_cap == _block_queue.size();
}
如何实现 生产者生产数据 即入队?
因为阻塞队列是整体使用的,所以当生产者生产时,消费者就不能动阻塞队列!而他们都可以看到阻塞队列,即阻塞队列是临界资源!如何保障临界资源同一时刻被一个执行流访问?当然是加一把互斥锁了!然后当队列不为满时,入队;否则等待!
如何保证等待?
使用条件变量!
但是现在的条件变量只有一个,而消费者和生产者都要等待,如果有一个条件变量的话,会使得编码很复杂,所以我们再加一个条件变量,让他们等到时在各自的条件下等待!
// 生产者 生产(入队)
void Push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否为满
if(IsFull())// if ?
{
// 在生产者的条件下等待
pthread_cond_wait(&_p_cond, &_mutex);
}
// 1、不为满 || 2、重新竞争到锁了
_block_queue.push(in);
// 解锁
pthread_mutex_unlock(&_mutex);
}
如何实现 消费者消费数据 即出队?
首先也是得加锁的,保证在消费者访问时,生产者不能打扰!其次当阻塞队列为空时,消费者应该在他的条件下等待!
// 消费者 消费(出队)
void Pop(T *out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否为空
if(IsEmpty()) // if ?
{
// 在消费者的条件下等待
pthread_cond_wait(&_c_cond, &_mutex);
}
// 1、不为空 || 2、重新竞争到锁了
*out = _block_queue.front();
_block_queue.pop();
// 解锁
pthread_mutex_unlock(&_mutex);
}
现在有个尴尬的问题是:
如果生产者或消费者,因为原先的阻塞队列是满/空而等待时,对方虽然消费/生产了数据,即可以生产/消费了,而对方不知道!所以要想办法唤醒对方!如何唤醒呢?
知道是否可以消费的是生产者,因为当他执行完入队操作时,注定了队列中一定至少有一个元素,所以,让他唤醒消费者最合适,因为只有他可以确保队列中有数据!
同理,知道是否可以生产的一定是消费者,以为当他拿走一个数据后,至少队列中有一个位置可以生产(入队),所以让他唤醒生产者~!
所以,当生产者或消费者,执行完生产/消费时,应该唤醒阻塞的对方继续操作!
#pragma once
#include <pthread.h>
#include <queue>
template <class T>
class BlockingQueue
{
private:
// 判断阻塞队列是否为空
bool IsEmpty()
{
return _block_queue.empty();
}
// 判断阻塞队列是否为满
bool IsFull()
{
return _max_cap == _block_queue.size();
}
public:
// 构造
BlockingQueue(int cap = default_cap)
: _max_cap(cap)
{
pthread_mutex_init(&_mutex);
pthread_cond_init(&_c_cond);
pthread_cond_init(&_p_cond);
}
// 析构
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_init(&_c_cond);
pthread_cond_init(&_p_cond);
}
// 生产者 生产(入队)
void Push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否为满
if (IsFull()) // if ?
{
// 在生产者的条件下等待
pthread_cond_wait(&_p_cond, &_mutex);
}
// 1、不为满 || 2、重新竞争到锁了
_block_queue.push(in);
// 解锁
pthread_mutex_unlock(&_mutex);
// 唤醒阻塞的消费者
pthread_cond_signal(&_c_cond);
}
// 消费者 消费(出队)
void Pop(T *out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否为空
if (IsEmpty()) // if ?
{
// 在消费者的条件下等待
pthread_cond_wait(&_c_cond, &_mutex);
}
// 1、不为空 || 2、重新竞争到锁了
*out = _block_queue.front();
_block_queue.pop();
// 解锁
pthread_mutex_unlock(&_mutex);
// 唤醒阻塞的生产者
pthread_cond_signal(&_p_cond);
}
private:
std::queue<T> _block_queue;
int _max_cap; // 阻塞队列的容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _c_cond; // 消费者条件变量
pthread_cond_t _p_cond; // 生产者条件变量
};
单生产但单费阻塞队列,这样就封装好了!我们下面来实现一下上层的调用操作:
上层调用的代码写在test.cc中!生产者去向队列中写入整数(1~10的随机数),消费者从队列中拿出来打印~!
#include "BlockingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>
void* Consumer(void*args)
{
BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(args);
while (true)
{
// 从阻塞队列获取数据
int data = 0;
bq->Pop(&data);
// 处理数据
std::cout << "Consumer -> " << data << std::endl;
}
return nullptr;
}
void* Producer(void*args)
{
BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(args);
srand(time(nullptr) ^ getpid());
while (true)
{
// 生产数据
int data = rand() % 10 + 1;
bq->Push(data);
// 处理数据
std::cout << "Producer -> " << data << std::endl;
}
return nullptr;
}
int main()
{
// 创建一个阻塞队列
BlockingQueue<int> *bq = new BlockingQueue<int>();
// 创建两个线程
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, bq);// 消费者
pthread_create(&p, nullptr, Producer, bq);// 生产者
// 等待线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
我们先来直接运行:
此时生产者疯狂生产,消费者疯狂消费!也就是在两者疯狂的打印,不容易看到阻塞队列的特点,为了验证阻塞队列的特点,我们采用休眠的方式,验证:
1、消费者每一秒消费一次,生产者疯狂的生产
预期现象:生产者一次性把队列生产满,然后每个一秒消费者打印一次,生产者生产一次:
符合预期!
2、生产者每个一秒生产一个,消费者一直消费
预期现象:由于一开始生产者休眠,所以消费者阻塞,后面生产者隔一秒生产一个,同时唤醒消费者消费一个,所以就是隔一秒打印一个:
OK,符合预期~!
• 一些细节问题补充
虽然上面的单生产但消费的代码,已经可以跑起来了,但是里面还存在一些细节问题,下面我们来进行优化一下:
问题一:在阻塞队列中的Push/Pop中直接使用 if 判断条件是否满足,可能会出现问题!
理由如下:
1、pthread_cond_wait 函数可能调用失效,会造成误唤醒/伪唤醒的情况,如果此时是 if 则会继续向下走,可能导致非法的进行生产/消费!
2、如果是多线程情况下,只生产一个,而唤醒所有(伪唤醒),此时是 if,虽然只有一个持有锁的线程才可以访问,但是其他线程此时条件以满足,只是在锁的位置等待,此时会造成多个线程非法的进入临界区,造成资源的损坏!
如何解决?只需将 if 换成 while 即可!这样即使你造成了伪唤醒,在往下执行前会先检查!直到条件在往下继续执行:
// 生产者 生产(入队)
void Push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否为满
while (IsFull()) // if ?
{
// 在生产者的条件下等待
pthread_cond_wait(&_p_cond, &_mutex);
}
// 1、不为满 || 2、重新竞争到锁了
_block_queue.push(in);
// 解锁
pthread_mutex_unlock(&_mutex);
// 唤醒阻塞的消费者
pthread_cond_signal(&_c_cond);
}
// 消费者 消费(出队)
void Pop(T *out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否为空
while (IsEmpty()) // if ?
{
// 在消费者的条件下等待
pthread_cond_wait(&_c_cond, &_mutex);
}
// 1、不为空 || 2、重新竞争到锁了
*out = _block_queue.front();
_block_queue.pop();
// 解锁
pthread_mutex_unlock(&_mutex);
// 唤醒阻塞的生产者
pthread_cond_signal(&_p_cond);
}
问题二:在Push/Pop后,需要唤醒对方来执行!唤醒在解锁前后有影响吗?
答案是:没有影响!唤醒对方,在解锁前后都可以!原因是:
1、如果在解锁前唤醒对方,对方没有锁,他会在锁那个位置等待对方解锁!
2、如果在解锁后唤醒对方,对方没有条件变量,在条件变量那里等对方唤醒!
所以在解锁前后唤醒对方是没有影响的!
问题三:阻塞队列的任务中只能放 int 这样的整数吗?
当然不是!我们写的是模板呀!数据类型是T,T可以是int这样的整数,当然也可以是自定义类的对象喽!
我们这里搞一个任务类,Task.hpp 让它实现加法
#pragma once
#include <iostream>
class Task
{
public:
Task(int x, int y)
:_x(x),_y(y)
{}
Task(){}
std::string debug()
{
return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
}
void Excute()
{
_result = _x + _y;
}
void operator()()
{
Excute();
}
std::string result()
{
std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
此时我们可以:让生产者给两个数,让你消费者计算:
看效果:
格局打开,这里只是放了一个简单计算的任务,我们实际还可以放入更复杂的任务!
比如 网络请求、SQL查询、并行 IO尤其是 IO,使用 「生产者消费者模型」 可以大大提高效率,包括后面的 多路转接,也可以接入 「生产者消费者模型」 来提高效率!
2.3 多生产多消费模型
基于上面的介绍,我们可以实现多生产多消费模型了!其实,经过上面的修改,我们不需要修改,上面的代码直接可以适应多生产多消费的场景!
OK,我们先来实验一下:
int main()
{
// 创建一个阻塞队列
BlockingQueue<Task> *bq = new BlockingQueue<Task>();
// 创建两个线程
pthread_t c1, c2, p1, p2, p3;
pthread_create(&c1, nullptr, Consumer, bq); // 消费者
pthread_create(&c2, nullptr, Consumer, bq); // 消费者
pthread_create(&p1, nullptr, Producer, bq); // 生产者
pthread_create(&p2, nullptr, Producer, bq); // 生产者
pthread_create(&p3, nullptr, Producer, bq); // 生产者
// 等待线程
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
delete bq;
return 0;
}
当然这可能会造成屏幕的打印错乱问题,这是因为显示器本质也是文件,不同线程向同一个文件写入,显示器不就是临界资源吗?所以理论上也要对显示器作保护的!
• 为什么当前单生产单消费的代码不直接修改就可以适用于 多生产多消费场景 呢?
原因很简单:
生产者消费者、都是对同一个阻塞队列做操作,而阻塞队列是整体使用,即每次只允许一个持有锁的线程访问,所以即使多线程过来,也是得先竞争锁资源的!而互斥锁保证了他们是串行的~!
OK,以上就是基于阻塞队列实现的生产者消费者模型了!下面我们来介绍信号量和用信号量基于循环队列的生产者消费者模型!
三、POSIX 信号量
3.1 信号量的基本概念
互斥和同步 不是只能由 互斥锁和条件变量 实现,还能通过 信号量 sem 和 互斥锁实现(出自POSIX标准)
信号量 的本质是一个 计数器,描述临界资源中资源数目的计数器;
• 申请到资源,计数器 -- (P操作)
• 释放完资源,计数器 ++(V操作)
信号量是描述临界资源数目的,但他也是被所有的线程所共享,即信号量是临界资源,所以对信号量的 PV 操作必须是原子的!
如果我们把「生产者消费者模型」中某一临界资源 整体性使用,那他的信号量的值就是 1:
• sem 值为 1,表示线程可以生产/消费,执行sem--;
• sem 值为 0,表示线程不能生产/消费,只能阻塞等待;
此时的信号量只有两态,即 1 / 0 ,可以实现互斥锁的效果,即实现线程互斥!像这种只有两态的信号量被称为 二元信号量/二进制信号量
如果我们把「生产者消费者模型」中某一临界资源 分成N份使用,那信号量的值就是N:
• 当 sem == N 时,阻塞队列已经空了,消费者无法消费
• 当 sem == 0 时,阻塞队列已经满了,生产者无法生产
• 当 sem != 0 && sem != N 时,申请资源就绪,sem--;资源释放,sem++;
像这种信号量的值被初始化为N,的信号量被称为多元信号量/计数信号量
当一个线程想要访问临界资源时,就必须要申请信号量,当申请成功,继续执行;否则,就阻塞等待,直到有信号量资源可用!如此一来就可以和 条件变量 一样实现 同步 了!
其实 「信号量」 的工作机制类似于 买电影票,是一种 预订机制,只要你买到票了,即使你晚点到达电影院,你的位置也始终可用,买到票的本质是将对应的座位进行了预订!详见:Linux IPC-System V
3.2 信号量的相关操作
有了之前 互斥锁、条件变量 的使用基础,信号量 的接口学习是非常简单的,依旧是只有四个接口:初始化、销毁、申请、释放
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数
• sem 表示需要初始化的信号量的指针
• pshared 表示当前信号的共享状态,0表示线程间共享,1表示进程间共享
• value 表示信号量的初始值,可以设置多元/二元
返回值
成功返回
0
,失败返回-1
,并设置错误码注意:这一类函数的返回值都是一样的,后面不在介绍
销毁信号量
#include <semaphore.h>
int sem_destroy(sem_t *sem);
参数
待销毁的信号量指针
申请信号量(P操作--)
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
主要使用 :int sem_wait(sem_t *sem);
参数
第一个:sem 表示从哪一个信号量中申请
第二个:尝试申请,如果没有申请到资源,就会返回;
第三个:尝试申请,如果没有申请到资源,就会返回,每隔一段时间进行申请,即
timeout
释放信号量(V操作++)
#include <semaphore.h>
int sem_post(sem_t *sem);
参数
将资源释放到哪个信号量中
这批接口属于是看一眼就会用,再多看一眼就会爆炸!所以我们直接来基于上述的接口实现生产者消费者模型!
四、基于环形队列实现生产者消费者模型
4.1 环形队列
生产者消费者模型 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列 !
关于环形队列,这里不在多哔哔,我在数据结构的时候手撕过:DS线性表之栈和队列
数组实现的环形队列,麻烦的就是如何判断空和满?判断的方式有两种,以前我在博客中提到了第一种:
1、多开一个空间,当tail + 1 == head 时,表示满;当tail == head 的时候,表示空;
2、搞一个计数器,当计数器的值为 0
时,表示当前为空,当计数器的值为容量时,表示队列为满
这两种策略都可以确保 环形队列 正确判空和判满,至于这里肯定是选择策略二,因为 「信号量」 本身就是一个天然的计数器
在 环形队列 中,生产者 和 消费者 关心的资源不一样:生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据;所以可以搞两个信号量分别标识生产者和消费者的资源数!
• 生产者信号量:标识当前有多少可用空间
• 消费者信号量:标识当前有多少数据
• 生产者的初始值为,环形队列的大小,消费者的初始值为0;
• 无论是生产者还是消费者,只有申请到自己的 「信号量」 资源后,才进行 生产 / 消费、
OK,有了上述的理解,就可以去实现了,我们还是先单生产但消费,然后多生产多消费!
4.2 单生产单消费模型
我们定义一个数组作为循环队列的底层缓冲区,定义两个信号量分别是空间和数据,定义生产者和消费者的下标!为了操作和理解,我们将sem_wait和sem_post封装成P和V:
#pragma once
#include <pthread.h>
#include <semaphore.h>
#include <vector>
const static int default_cap = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
public:
RingQueue(int cap = default_cap)
: _ring_queue(cap),_max_cap(cap),_c_step(0),_p_step(0)
{
sem_init(&_space, 0, _max_cap);
sem_init(&_data, 0, 0);
}
~RingQueue()
{
sem_destroy(&_space);
sem_destroy(&_data);
}
void Push(const T& in)
{
// 申请信号量
P(_space);
// 生产
_ring_queue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
// 释放信号量
V(_data);
}
void Pop(T* out)
{
// 申请信号量
P(_data);
// 消费
*out = _ring_queue[_c_step];
_c_step++;
_c_step %= _max_cap;
// 释放信号量
V(_space);
}
private:
std::vector<T> _ring_queue;
int _max_cap;//容量
sem_t _space;// 空间信号量
sem_t _data;// 数据信号量
int _c_step;// 消费者下标
int _p_step;// 生产者下标
};
我们还是先用数字测试:
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>
void *Consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true)
{
// 获取数据
int data = 0;
rq->Pop(&data);
// 处理数据
std::cout << "Consumer -> " << data << std::endl;
sleep(1);
}
return 0;
}
void *Producer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true)
{
// 生产数据
int data = rand() % 10 + 1;
rq->Push(data);
// 处理数据
std::cout << "Producer -> " << data << std::endl;
sleep(1);
}
return 0;
}
int main()
{
srand(time(nullptr));
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c,p;
pthread_create(&c, nullptr, Producer,rq);
pthread_create(&p, nullptr, Consumer,rq);
// 等待线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
为了避免刷屏的效果,我们先各一秒,然后生产和消费:
让消费者一秒读取一次;预期现象:生产者先生成满,然后一秒过后,消费一个,生产一个:
当然,让生产者一秒生产一个,消费者不休眠,现象就是生成一个,消费一个:
细节问题:在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?
互斥:虽然这里没有直接使用互斥锁,但信号量的操作本身是原子的,这意味着在任何时刻,只有一个线程(生产者或消费者)可以修改信号量的值。因此,通过信号量的机制,间接实现了 生产者和消费者 对环形队列访问的 互斥。
同步:信号量的操作确保了生产者和消费者之间的同步。当队列满时,生产者必须阻塞等待消费者消费;当队列空时,消费者必须阻塞等待生产者生产。这种等待和唤醒的PV操作就是同步的体现!
当然我们可不止只会生产和消费整数,因为是模板所以,可以是任意的类型,我们呢可以把上面的阻塞队列的Task.hpp的任务拿过来直接测试:
#pragma once
#include <iostream>
class Task
{
public:
Task(int x, int y)
:_x(x),_y(y)
{}
Task(){}
std::string debug()
{
return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
}
void Excute()
{
_result = _x + _y;
}
void operator()()
{
Excute();
}
std::string result()
{
std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
这里的运行结果与 阻塞队列 那边的一模一样,证明当前的 「生产者消费者模型」 没有问题(单生产单消费场景中)
注:如果想要提高并发度,可以增大环形队列的容量
4.3 多生产多消费模型
信号量的 PV 操作,保证的是 生产者和消费者 在任意时刻访问 循环队列 时只有一个线程可操作!也就是他保证的是 生产者和消费者之间 的互斥!
但是现在是,多执行流即多生产多消费,他们申请到信号量进行 生产/消费 时可能会出现问题,原因是:生产/消费的下标各是一个,也就是会造成对于同一资源的破坏!如何解决 生产和生产/消费和消费 之间的 互斥关系 呢?加互斥锁!
现在的问题是加几把锁?
答案是:加两把!因为生产者和消费者关注的资源是不一样!
阻塞队列 中为什么只需要一把锁?
因为阻塞队列中的共享资源是一整个队列,生产者和消费者访问的是同一份资源,所以一把锁就够了
#pragma once
#include <pthread.h>
#include <semaphore.h>
#include <vector>
const static int default_cap = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex);
}
void UnLock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = default_cap)
: _ring_queue(cap), _max_cap(cap), _c_step(0), _p_step(0)
{
sem_init(&_space, 0, _max_cap);
sem_init(&_data, 0, 0);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
~RingQueue()
{
sem_destroy(&_space);
sem_destroy(&_data);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
void Push(const T &in)
{
// 申请信号量
P(_space);
// 加锁
Lock(_p_mutex);
// 生产
_ring_queue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
// 解锁
UnLock(_p_mutex);
// 释放信号量
V(_data);
}
void Pop(T *out)
{
// 申请信号量
P(_data);
// 加锁
Lock(_c_mutex);
// 消费
*out = _ring_queue[_c_step];
_c_step++;
_c_step %= _max_cap;
// 解锁
UnLock(_c_mutex);
// 释放信号量
V(_space);
}
private:
std::vector<T> _ring_queue;
int _max_cap; // 容量
sem_t _space; // 空间信号量
sem_t _data; // 数据信号量
int _c_step; // 消费者下标
int _p_step; // 生产者下标
pthread_mutex_t _c_mutex; // 消费者互斥锁
pthread_mutex_t _p_mutex; // 生产者互斥锁
};
让消费者先休眠一秒,然后现象应该是 :先生成满,然后一秒消费一个,生产一个:
细节1: 在信号量申请成功之后 加锁,可以提高并发度
上述的,为了防止 生产和生产/消费和消费 对同一资源破坏,需要加互斥锁让他们串行!加锁的位置有两种:1、在申请信号量前加锁 2、在申请信号量以后加锁 这两种都是可以的!但是后者更优!原因如下:
这就好比,你去看电影,你是到时候进放映厅的时候再买票,还是先买票到时候直接进去?
当然是后者喽!原因是信号量的PV本身就是原子的,所以不会出错!所以可以提前申请好信号到时候竞争互斥锁串行访问,即可!
细节2:为什么在申请信号量的时候,不需要判断一下条件是否满足?
信号量本质就是一个资源数目的计数器!是一种资源的预定机制!
预定就体现在:可以不判断是否满足,就可以知道内部资源的情况!申请信号量本身就是在条件判断!
• 如何理解生产者消费者模型的效率高?
阻塞队列 中,每一个线程执行操作都必须先得加锁,也就是串行操作!循环队列 中,多线程即使并发申请到了信号量,最后也是得申请锁,串行执行操作的!这好像也没多高效吧!这样看确实!但是上面刚说了,我们生产者和消费者不只是处理的是 int 这样的整数,而是大多可能执行的是 网络请求、SQL查询、并行 IO 等,而请求和处理这些操作 本身是很费时间的!
当一个线程在 请求完正在处理这个任务 的同时(花费时间),其他线程去请求,这样不就大大的提高了 并发度 吗?不就是提高了,效率吗!而在这种比较费时间的操作下,加锁和解锁的时间也是可以忽略的!即书上所说的,生产者消费者模型的高效是体现在这里!
OK,好兄弟这就是CP模型的所有内容了,我是 cp 我们下期见~!