✨个人主页: 北 海
🎉所属专栏: Linux学习之旅
🎃操作环境: CentOS 7.6 腾讯云远程服务器
文章目录
- 🌇前言
- 🏙️正文
- 1、生产者消费者模型
- 1.1、什么是生产者消费者模型?
- 1.2、生产者消费者模型的特点
- 1.3、生产者消费者模型的优点
- 2、基于阻塞队列实现生产者消费者模型
- 2.1、阻塞队列
- 2.2、单生产单消费模型
- 2.3、多生产多消费模型
- 3、POSIX 信号量
- 3.1、信号量的基本知识
- 3.2、信号量相关操作
- 4、基于环形队列实现生产者消费者模型
- 4.1、环形队列
- 4.2、单生产单消费模型
- 4.3、多生产多消费模型
- 🌆总结
🌇前言
生产者消费者模型(CP模型)是一种非常经典的设计,常常出现在各种 「操作系统」 书籍中,深受教师们的喜爱;这种模型在实际开发中还被广泛使用,因为它在多线程场景中是十分高效的!
🏙️正文
1、生产者消费者模型
1.1、什么是生产者消费者模型?
「生产者消费者模型」是通过一个容器来解决生产者与消费者的强耦合关系,生产者与消费者之间不直接进行通讯,而是利用 「容器」来进行通讯
生产者?消费者?容器?耦合?晦涩难懂的名词难免让人打起退堂鼓,其实它们都很好理解,比如接下来我们可以借助一个 超市 的例子来深刻理解 生产者消费者模型
可以先忘掉之前的定义,接下来让我们看看 心连心超市 的工作模式
超市的工作模式
- 超市从工厂进货,工厂需要向超市提供商品
- 顾客在超市选购,超市需要向顾客提供商品
超市盈利的关键在于 平衡顾客与工厂间的供需关系
简单来说就是要做到 顾客可以在超市买到想要购买的商品,工厂也能同超市完成足量的需求订单,满足条件后,超市就可以盈利了
超市盈利的同时可以给供给双方带来便利
- 顾客不需要跑到工厂购买商品
- 工厂也不需要将商品配送到顾客手中
这就叫做 解决生产者与消费者间的强耦合关系
得益于 超市 做缓冲区,整个 生产消费 的过程十分高效,即便顾客没有在超市中找到想要的商品,也可借助超市之手向工厂进行反映,从而生产对应的商品,即 允许生产消费步调不一致
现实中的 超市工作模式 就是一个生动形象的 「生产者消费者模型」
- 顾客 -> 「消费者」
- 工厂 -> 「生产者」
- 超市 -> 「交易场所(容器)」
生产者消费者模型的本质:忙闲不均
其中的 「交易场所」 是进行 生产消费 的容器,通常是一种特定的 缓冲区,常见的有 阻塞队列 和 环形队列
超市不可能只面向一个顾客及一个工厂,「交易场所」 也是如此,会被多个 生产者消费者(多个线程) 看到,也就是说 「交易场所」 注定是一个共享资源;在多线程环境中,需要保证 共享资源被多线程并发访问时的安全(详见 Linux多线程【线程互斥与同步】)
回归现实中,多个工厂供应同一种商品时,为了抢占更多的市场,总会通过一些促销手段来排除竞品,比如经典的 泡面巨头 <康师傅与统一> 的大战,市场(超市中的货架位置)是有限的,在工厂竞争之下,势必有一家工厂失去市场,因此可以得出 生产者与生产者之前需要维持 「互斥」 关系
生产者与生产者:「互斥」
张三和李四在超市偶遇,俩人同时看中了 「快乐牌刀片」,但最近超市货源紧张,这个商品仅有一份,张三李四互不谦让,都在奋力争夺这个商品,显然当商品只有一份时 消费者与消费者之间也需要维护 「互斥」关系
消费者与消费者:「互斥」
某天张三又来到了超市,打算购买他最喜欢的 老坛酸菜牛肉面,但好巧不巧,超市的最后一桶 老坛酸菜牛肉面 已经售出,张三只能通知超市进行备货,超市老板记下了这个需求,张三失落的回了家,刚到家,张三的肚子就饿的咕咕叫,十分想念 老坛酸菜牛肉面,于是火速赶往超市,看看超市是否有货,答案是没有,法外狂徒张三是一个执着的人,总是反复跑到超市查看是否有货,导致张三这一天什么事也干不成,只想着自己的 老坛酸菜牛肉面;其实张三不必这样做,只需要在第一次告诉超市老板自己的需求,并添加老板的联系方式,让老板在商品备货完成后通知张三前来购买,将商品信息同步给消费者,这样可以避免张三陷入循环,同理对于工厂来说,超市老板也应该添加工厂负责人的联系方式,将商品信息同步给生产者,也就是说 生产者与消费者之间存在 「同步」关系;除此之外,在超市备货期间,张三是不能来购买的,即 生产者与消费者之间还存在 「互斥」关系
生产者与消费者:同步、互斥
注意: 生产者与消费者之间的「互斥」关系不是必备的,目的是为了让 生产、消费 之间存在顺序
「生产者消费者模型」是一个存在 生产者、消费者、交易场所 三个条件,以及不同角色间的 同步、互斥 关系的高效模型
1.2、生产者消费者模型的特点
「生产者消费者模型」 的最根本特点是 321原则
3
种关系
- 生产者与生产者:互斥
- 消费者与消费者:互斥
- 生产者与消费者:互斥与同步
2
种角色
- 生产者
- 消费者
1
个交易场所
- 通常是一个特定的缓冲区(阻塞队列、环形队列)
注:321
原则并非众所周知的概念,仅供辅助记忆 「生产者消费者模型」的特点
任何 「生产者消费者模型」 都离不开这些必备特点
生产者与消费者间的同步关系
- 生产者不断生产,交易场所堆满商品后,需要通知消费者进行消费
- 消费者不断消费,交易场所为空时,需要通知生产者进行生产
通知线程需要用到条件变量,即维护 同步 关系
其实之前在 Linux 进程间通信 【管道通信】 中学习到的 管道 本质上就是一个天然的 「生产者消费者模型」,因为它允许多个进程同时访问,并且不会出现问题,意味着它维护好了 「互斥、同步」 关系;当写端写满管道时,无法再写,通知读端进行读取;当管道为空时,无法读取,通知写端写入数据
1.3、生产者消费者模型的优点
「生产者消费者模型」为何高效?
- 生产者、消费者 可以在同一个交易场所中进行操作
- 生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置
- 消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据
- 可以根据不同的策略,调整生产者于与消费者间的协同关系
「生产者消费者模型」可以根据供需关系灵活调整策略,做到 忙闲不均
除此之外,「生产者消费者模型」 划分出了三个不同的条件:生产者、消费者、交易场所 各司其职,可以根据具体需求自由设计,很好地做到了 解耦,便于维护和扩展
2、基于阻塞队列实现生产者消费者模型
2.1、阻塞队列
编写 「生产者消费者模型」 需要用到 Linux 互斥与同步 的知识,这里先选择 阻塞队列 作为交易场所进行实现,在正式编写代码前,需要先认识一下 阻塞队列
阻塞队列 Blocking Queue
是一种特殊的队列,作为队列家族的一员,它具备 先进先出 FIFO
的基本特性,与普通队列不同的是: 阻塞队列 的大小是固定的,也就说它存在 容量 的概念
阻塞队列可以为空,也可以为满
将其带入 「生产者消费者模型」 中,入队 就是 生产商品,而 出队 则是 消费商品
- 阻塞队列为满时:无法入队 -> 无法生产(阻塞)
- 阻塞队列为空时:无法出队 -> 无法消费(阻塞)
是不是感觉跟 管道 十分相像,至于如何处理队空/队满的特殊情况,就需要借助 「互斥、同步」 相关知识了,具体在代码中体现
2.2、单生产单消费模型
首先来实现最简单的 单生产单消费者模型,首先搭好 阻塞队列类 的框架
创建
BlockingQueue.hpp
头文件
#pragma once
#include <queue>
#include <mutex>
#include <pthread.h>
// 命名空间,避免冲突
namespace Yohifo
{
#define DEF_SIZE 10
template<class T>
class BlockQueue
{
public:
BlockQueue(size_t cap = DEF_SIZE)
:_cap(cap)
{
// 初始化锁与条件变量
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~BlockQueue()
{
// 销毁锁与条件变量
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
// 生产数据(入队)
void Push(const T& inData);
// 消费数据(出队)
void Pop(T* outData);
private:
// 判断是否为满
bool IsFull();
// 判断是否为空
bool IsEmpty();
private:
std::queue<T> _queue;
size_t _cap; // 阻塞队列的容量
pthread_mutex_t _mtx; // 互斥锁(存疑)
pthread_cond_t _cond; // 条件变量(存疑)
};
}
如何判断阻塞队列是否为空:判断 queue
是否为空
如何判断阻塞队列是否为满:判断 queue
的大小是否为 _cap
使用 互斥锁 + 条件变量 实现互斥与同步
获得工具框架后,接下来搭建 生产与消费 的代码
因为是 单生产、单消费,只需要手动创建两个线程即可
创建
cp.cc
源文件
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "BlockingQueue.hpp"
void* Producer(void *args)
{
Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);
while(true)
{
// 1.生产商品(通过某种渠道获取数据)
// ...
// 2.将商品推送至阻塞队列中
// bq->Push(data);
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);
while(true)
{
// 1.从阻塞队列中获取商品
// bq->Pop(&data);
// 2.消费商品(结合某种具体业务进行处理)
// ...
}
pthread_exit((void*)0);
}
int main()
{
// 创建一个阻塞队列
Yohifo::BlockQueue<int>* bq = new Yohifo::BlockQueue<int>;
// 创建两个线程(生产者、消费者)
pthread_t pro, con;
pthread_create(&pro, nullptr, Producer, bq);
pthread_create(&con, nullptr, Consumer, bq);
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete bq;
return 0;
}
注意:
- 生产者、消费者需要看到同一个阻塞队列,可以通过线程的回调函数参数进行传递
- 其他具体实现细节仍需填充
以上就是 「生产者消费者模型」 所需要的大体框架,具体细节实现可以接着往下看,不过在这之前需要先理解 为什么生产、为什么消费
数据就像能量一样,不会凭空产生,也不会凭空消失,因此生产者线程在生产 商品(数据) 时,一定是从某种渠道获取的,比如客户发出的 HTTP
请求、程序猿发出的 SQL
语句、涉及复杂运算的计算任务等,总之 生产者需要先获取数据,才能将其放入阻塞队列中,等待处理
同理,消费者线程在获取 商品(数据) 后,也需要结合业务逻辑做出不同的动作,比如根据 HTTP
请求进行响应、返回 SQL
查询结果、返回计算结果等,一句话总结:生产者生产商品、消费者消费商品都是需要时间的,并非单纯地对阻塞队列进行操作
这是一个十分重要的概念,它能帮助我们正确看待 生产者、消费者 的作用,这是被大多数教材忽略的重要概念
现在可以补充 BlockingQueue.hpp
和 cp.cc
具体细节了,首先来看看 BlockingQueue.hpp
的实现
BlockQueue
的成员变量问题(互斥锁、条件变量如何分配)
在 「生产者消费者模型」 中,有 满、空 两个条件,这两个条件是 绝对互斥 的,不可能同时满足,「生产者」关心是否为满,「消费者」关心是否为空,两者关注的点不一样,也就是说不能只使用一个条件变量来控制两个条件,而是需要 一个生产者条件变量、一个消费者条件变量
BlockQueue(size_t cap = DEF_SIZE)
:_cap(cap)
{
// 初始化锁与条件变量
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_pro_cond, nullptr);
pthread_cond_init(&_con_cond, nullptr);
}
~BlockQueue()
{
// 销毁锁与条件变量
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_pro_cond);
pthread_cond_destroy(&_con_cond);
}
std::queue<T> _queue;
size_t _cap; // 阻塞队列的容量
pthread_mutex_t _mtx; // 互斥锁
pthread_cond_t _pro_cond; // 生产者条件变量
pthread_cond_t _con_cond; // 消费者条件变量
创建两个条件变量是阻塞队列的精髓之一
条件变量需要两个,锁是否也需要两把呢?
答案是不需要,因为无论是「生产者」还是「消费者」,它们需要看到同一个阻塞队列,因此使用一把互斥锁进行保护就行了
BlockQueue
的Push
、Pop
函数简单实现
首先来看看 Push
函数的简单实现,Push
函数的功能就是将传入的数据 inData
添加到 阻塞队列
中,因为 阻塞队列 是一个 临界资源,在访问前必须加锁
// 生产数据(入队)
void Push(const T& inData)
{
// 加锁
pthread_mutex_lock(&_mtx);
_queue.push(inData);
pthread_mutex_unlock(&_mtx);
}
这样写只是对 普通临界资源 的访问,但这里是 阻塞队列,插入数据的前提是 有空间,不然是要被 阻塞 的,所以我们在进行数据插入前,应该先判断条件是否满足,不满足就得 阻塞等待条件满足
// 生产数据(入队)
void Push(const T& inData)
{
// 加锁
pthread_mutex_lock(&_mtx);
// 判断条件是否满足
if(IsFull())
{
pthread_cond_wait(&_pro_cond, &mtx);
}
_queue.push(inData);
pthread_mutex_unlock(&_mtx);
}
// 判断是否为满
bool IsFull()
{
return _queue.size() == _cap;
}
当条件不满足时,生产者线程进入等待状态
这里可以解释一下为什么需要给 pthread_cond_wait
函数传入互斥锁
- 首先要明白,判断条件是否满足,是在临界区内进行的,也就是说当前线程持有锁
- 当条件不满足时,当前线程进入条件等待状态,也就意味着它现在无法向后运行,将锁释放
- 此时其他线程就得不到锁资源了,程序进入了死锁状态
- 解决方法就是 将锁资源传递给
pthread_cond_wait
函数,使其拥有 「释放锁、获取锁」的能力,这样就能保证不会出现 死锁
这就是 同步 能解决 死锁 问题的关键,因为它可以主动让出 锁资源
过了一段时间,当条件满足时(消费者已经消费数据了),代码从 pthread_cond_wait
函数之后继续运行,生产者可以正常进行生产(可以确保一定有空位),一切看起来似乎很和谐,但此时存在一个致命问题:如果是「消费者」先阻塞(阻塞队列为空),「生产者」正常进行生产,当生产满后,「生产者」也进入了阻塞状态,此时就尴尬了,彼此都陷入了阻塞等待状态
造成此问题的根本原因是:「生产者」在生产结束后没有唤醒「消费者」,让其进行正常消费,所以在 生产完成 后,需要唤醒 「消费者」 进行消费
// 生产数据(入队)
void Push(const T& inData)
{
// 加锁
pthread_mutex_lock(&_mtx);
// 判断条件是否满足
if(IsFull())
{
pthread_cond_wait(&_pro_cond, &mtx);
}
_queue.push(inData);
// 可以加策略唤醒,比如生产一半才唤醒消费者
pthread_cond_signal(&_con_cond);
pthread_mutex_unlock(&_mtx);
}
单生产、单消费场景中的 Push
可以这样写,其他场景就需要稍微进行修改
注意: 生产者唤醒的是消费者,也就是需要传递 _con_cond
当消费者没有
wait
等待,生产者仍然进行唤醒时,是否会出现问题?
答案是不会,唤醒一个没有wait
的线程是不会有影响的,同时因为唤醒线程这个操作不需要加锁保护(本身就持有锁资源句柄),pthread_cond_signal
函数可以放到pthread_mutex_unlock
语句之后
有了 Push
的实现经验后,Pop
的实现就很简单了,依葫芦画瓢,简单实现如下:
// 消费数据(出队)
void Pop(T* outData)
{
// 加锁
ptherad_mutex_lock(&_mtx);
if(IsEmpty())
{
pthread_cond_wait(&_con_cond, &_mtx);
}
*outData = _queue.front();
_queue.pop();
// 可以加策略唤醒,比如消费完后才唤醒生产者
pthread_cond_signal(&_pro_cond);
pthread_mutex_unlock(&_mtx);
}
// 判断是否为空
bool IsEmpty()
{
return _queue.empty();
}
这种写法也只适用于单生产、单消费场景中
注意: 消费者唤醒的是生产者,需要传递 _pro_cond
cp.cc
的使用填充
有了 「生产者消费者模型」 后,就可以进行简单使用了
因为这里没有具体的业务场景,所以我们就使用 生成一个随机数 作为待插入的数据,打印数字 作为获取数据后的操作
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "BlockingQueue.hpp"
void* Producer(void *args)
{
Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);
while(true)
{
// 1.生产商品(通过某种渠道获取数据)
int num = rand() % 10;
// 2.将商品推送至阻塞队列中
bq->Push(num);
std::cout << "Producer 生产了一个数据: " << num << std::endl;
std::cout << "------------------------" << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);
while(true)
{
// 1.从阻塞队列中获取商品
int num;
bq->Pop(&num);
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer 消费了一个数据: " << num << std::endl;
std::cout << "------------------------" << std::endl;
}
pthread_exit((void*)0);
}
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::BlockQueue<int>* bq = new Yohifo::BlockQueue<int>;
// 创建两个线程(生产者、消费者)
pthread_t pro, con;
pthread_create(&pro, nullptr, Producer, bq);
pthread_create(&con, nullptr, Consumer, bq);
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete bq;
return 0;
}
此时可以编译并运行程序,可以看到 生产者疯狂生产,消费者疯狂消费
这样不容易观察到 阻塞队列 的特点,我们可以通过 睡眠 的方式模拟效果
策略1:消费者每隔一秒消费一次,生产者疯狂生产
应该观察到的现象是 生产者很快就把阻塞队列填满了,只能阻塞等待,1
秒之后,消费者进行消费,消费结束后唤醒生产者,两者进入协同状态:生产者生产一个数据、消费者消费一个数据
void* Consumer(void *args)
{
Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);
while(true)
{
// 消费者每隔一秒进行一次消费
sleep(1);
// 1.从阻塞队列中获取商品
int num;
bq->Pop(&num);
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer 消费了一个数据: " << num << std::endl;
std::cout << "------------------------" << std::endl;
}
pthread_exit((void*)0);
}
程序运行结果符合预期
策略2:生产者每隔一秒生产一次,消费者不断消费
预期结果为 刚开始阻塞队列为空,消费者无法进行消费,只能阻塞等待,一秒后,生产者生产了一个数据,并立即通知消费者进行消费,两者协同工作,消费者消费的就是生产者刚刚生产的数据
void* Producer(void *args)
{
Yohifo::BlockQueue<int>* bq = static_cast<Yohifo::BlockQueue<int>*>(args);
while(true)
{
// 生产者每隔一秒生产一次
sleep(1);
// 1.生产商品(通过某种渠道获取数据)
int num = rand() % 10;
// 2.将商品推送至阻塞队列中
bq->Push(num);
std::cout << "Producer 生产了一个数据: " << num << std::endl;
std::cout << "------------------------" << std::endl;
}
pthread_exit((void*)0);
}
运行结果如下
两种策略都符合预期,证明当前的 「生产者消费者模型」 是可用的(单生产单消费场景中)
2.3、多生产多消费模型
在上面的 「生产者消费者模型」 中,存在一些细节问题
细节1:只有当条件满足时,才能进行 生产/消费
之前单纯使用一个 if
进行判断过于草率
理由如下:
pthread_cond_wait
函数可能调用失败(误唤醒、伪唤醒),此时如果是if
就会向后继续运行,导致在条件不满足的时候进行了 生产/消费- 在多线程场景中,可能会使用
pthread_cond_broadcast
唤醒所有等待线程,如果在只生产了一个数据的情况下,唤醒所有线程,会导致只有一个线程进行了合法操作,其他线程都是非法操作了
关于当前代码使用
if
判读,在多线程环境中广播pthread_cond_broadcast
的理解
这就好比食堂里有很多人等待出餐,当阿姨仅做好一份饭后,就通知所有同学过来取餐,直接导致其他同学白跑一趟;带入程序中,直接影响就是 生产者/消费者 在 队列满/队列空 的情况下,仍然进行了 数据生产/数据消费
所以需要把条件判断改成 while
,直到条件满足后,才向后运行
// 生产数据(入队)
void Push(const T& inData)
{
// 加锁
pthread_mutex_lock(&_mtx);
// 循环判断条件是否满足
while(IsFull())
{
pthread_cond_wait(&_pro_cond, &_mtx);
}
_queue.push(inData);
// 可以加策略唤醒,比如生产一半才唤醒消费者
pthread_cond_signal(&_con_cond);
pthread_mutex_unlock(&_mtx);
}
// 消费数据(出队)
void Pop(T* outData)
{
// 加锁
pthread_mutex_lock(&_mtx);
// 循环判读条件是否满足
while(IsEmpty())
{
pthread_cond_wait(&_con_cond, &_mtx);
}
*outData = _queue.front();
_queue.pop();
// 可以加策略唤醒,比如消费完后才唤醒生产者
pthread_cond_signal(&_pro_cond);
pthread_mutex_unlock(&_mtx);
}
细节2:生产者消费者模型的高效体现在 「解耦」
生产、消费 的过程是加锁的、串行化执行,可能有的人无法 get
到 「生产者消费者模型」 的高效,这是因为没有对 「生产者消费者模型」 进行一个全面的理解
单纯的向队列中放数据、从队列中取数据本身效率就很高,但 生产者从某种渠道获取数据、消费者获取数据后进行某种业务处理,这是效率比较低的操作,「生产者消费者模型」 做到了这两点
1.消费者在进行业务处理时,生产者可以直接向队列中 push
数据
比如 消费者 在获取到数据后,需要进行某种高强度的运算,当然这个操作与 生产者 是没有任何关系的,得益于 阻塞队列 作为缓冲区,生产者 可以在 消费者 进行运算时 push
数据
这就好比你买了一桶泡面回家吃,厂商并不需要关心你吃完没有,直接正常向超市供货就行了
2.生产者在进行数据生产时,消费者可以直接向队列中 pop
数据
同上,消费者 不需要关心 生产者 的状态,只要 阻塞队列 中还有数据,正常 pop
获取就行了;也就是说你在超市购物时,无需关心工厂的生产情况,因为这与你无关
一句话总结:生产者不必关心消费者的消费情况,消费者也不需要关心生产者的生产情况
而这就是 「生产者消费者模型」 高效的体现,也是对模型的全面理解
这有点像 「冯·诺依曼体系结构」 中的 内存,扮演着中间商的角色,使得 CPU
能和 外设 协同高效工作
细节3:阻塞队列中不止能放 int
,还能放对象
对象包罗万物,可玩性非常高,比如这里增加一个简单 Task
任务类,实现基本的两数运算
创建
Task.hpp
头文件
#pragma once
#include <string>
namespace Yohifo
{
// 支持泛型
template<class T>
class Task
{
public:
Task(T x = 0, T y = 0, char op = '+')
:_x(x), _y(y), _op(op), _res(0), _err(0)
{}
// 重载运算操作
void operator()()
{
// 简单计算
switch(_op)
{
case '+':
_res = _x + _y;
break;
case '-':
_res = _x - _y;
break;
case '*':
_res = _x * _y;
break;
case '/':
if(_y == 0)
_err = -1;
else
_res = _x / _y;
break;
case '%':
if(_y == 0)
_err = -2;
else
_res = _x % _y;
break;
default:
_err = 1;
break;
}
}
// 获取计算结果
std::string GetResult()
{
// 根据错误标识,返回计算结果
std::string ret = std::to_string(_x) + " " + _op + " " + std::to_string(_y);
if(_err)
{
ret += " error";
// 判读是 / 错误还是 % 错误
if(_res == -1)
ret += " [-1] \t / 0 引发了错误";
else
ret += " [-2] \t % 0 引发了错误";
}
else
{
ret += " = " + std::to_string(_res);
}
return ret;
}
private:
T _x;
T _y;
char _op; // 运算符
T _res; // 结果
int _err; // 错误标识
};
}
得到这样一个任务类后,就可以更改 cp.cc
中生产者、消费者线程的处理逻辑了
这里就简单修改,随机获取两个数和一个运算符,并计算出结果
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "BlockingQueue.hpp"
#include "Task.hpp"
void* Producer(void *args)
{
Yohifo::BlockQueue<Yohifo::Task<int>>* bq = static_cast<Yohifo::BlockQueue<Yohifo::Task<int>>*>(args);
// 运算符集
std::string opers = "+-*/%";
while(true)
{
// 生产者每隔一秒生产一次
sleep(1);
// 1.生产商品(通过某种渠道获取数据)
// 随机获取两个数(可以改为输入)
int x = rand() % 100;
int y = rand() % 100;
// 随机获取一种运算符
char ops[] = {'+', '-', '*', '/', '%'};
char op = opers[rand() % opers.size()];
// 2.将商品推送至阻塞队列中
// 创建匿名对象,并 Push 入阻塞队列中
bq->Push(Yohifo::Task<int>(x, y, op));
std::cout << "Producer 生产了: " << x << " " << y << " " << op << " 构成的对象" << std::endl;
std::cout << "----------------------------" << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::BlockQueue<Yohifo::Task<int>>* bq = static_cast<Yohifo::BlockQueue<Yohifo::Task<int>>*>(args);
while(true)
{
// 1.从阻塞队列中获取商品
Yohifo::Task<int> task;
bq->Pop(&task);
// 进行业务处理
task();
std::string ret = task.GetResult();
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer 消费了一个对象,并获得结果: " << ret << std::endl;
std::cout << "===========================" << std::endl;
}
pthread_exit((void*)0);
}
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::BlockQueue<Yohifo::Task<int>>* bq = new Yohifo::BlockQueue<Yohifo::Task<int>>;
// 创建两个线程(生产者、消费者)
pthread_t pro, con;
pthread_create(&pro, nullptr, Producer, bq);
pthread_create(&con, nullptr, Consumer, bq);
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete bq;
return 0;
}
为了避免打印到显示器时的格式错乱问题(屏幕也是临界资源,理论上也需要加锁保护),这里让 生产者 每隔一秒生产一次,进而控制 消费速度
这里故意把 _y
指定为 0
,查看运算出错的情况
格局打开,这里只是放了一个简单计算的任务,我们实际还可以放入更复杂的任务,比如 网络请求、SQL
查询、并行 IO
尤其是 IO
,使用 「生产者消费者模型」 可以大大提高效率,包括后面的 多路转接,也可以接入 「生产者消费者模型」 来提高效率
OK,现在可以尝试修改代码以适应 多生产多消费场景 了
需要改吗?不需要,至少在当前的代码设计中,我们的代码完全可以应付 多线程多消费
接下来在原有代码的基础上,直接多创建几个线程
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::BlockQueue<Yohifo::Task<int>>* bq = new Yohifo::BlockQueue<Yohifo::Task<int>>;
// 创建多个线程(生产者、消费者)
pthread_t pro[2], con[3];
for(int i = 0; i < 2; i++)
pthread_create(pro + i, nullptr, Producer, bq);
for(int i = 0; i < 3; i++)
pthread_create(con + i, nullptr, Consumer, bq);
for(int i = 0; i < 2; i++)
pthread_join(pro[i], nullptr);
for(int i = 0; i < 3; i++)
pthread_join(con[i], nullptr);
delete bq;
return 0;
}
运行结果如下,可以看到,确实有多个线程在运行,运行结果也没有问题
为什么当前代码设计中不需要修改就能适用于 多生产多消费场景 呢?
原因有两点:
- 生产者、消费者都是在对同一个
_queue
操作,用一把锁,保护一个临界资源,足够了 - 当前的
_queue
始终是被当作一个整体使用的,无需再增加锁区分
其实分别给 生产者和消费者 各配一把锁也是可以的,但在当前代码设计中(使用同一个 _queue
),完全没有必要
以上就是关于 基于阻塞队列实现「生产者消费者模型」的全部内容了,除了使用互斥锁外,还可以使用信号量,也就是使用环形队列来实现 「生产者消费者模型」
3、POSIX 信号量
3.1、信号量的基本知识
互斥、同步 不只能通过 互斥锁、条件变量 实现,还能通过 信号量 sem
、互斥锁 实现(出自 POSIX
标准)
「信号量」 的本质就是一个 计数器
- 申请到资源,计数器
--
(P
操作) - 释放完资源,计数器
++
(V
操作)
「信号量」 的 PV
操作都是原子的,假设将 「信号量」 的值设为 1
,用来表示 「生产者消费者模型」 中 阻塞队列 _queue
的使用情况
- 当
sem
值为1
时,线程可以进行 「生产 / 消费」,sem--
- 当
sem
值为0
时,线程无法进行 「生产 / 消费」,只能阻塞等待
此时的 「信号量」 只有两种状态:1
、0
,可以实现类似 互斥锁 的效果,即实现 线程互斥,像这种只有两种状态的信号量称为 「二元信号量」
「信号量」 不止可以用于 互斥,它的主要目的是 描述临界资源中的资源数目,比如我们可以把 阻塞队列 切割成 N
份,初始化 「信号量」 的值为 N
,当某一份资源就绪时,sem--
,资源被释放后,sem++
,如此一来可以像 条件变量 一样实现 同步
- 当
sem == N
时,阻塞队列已经空了,消费者无法消费 - 当
sem == 0
时,阻塞队列已经满了,生产者无法生产
用来实现 互斥、同步 的信号量称为 「多元信号量」
综上所述,在使用 「多元信号量」 访问资源时,需要先申请 「信号量」,只有申请成功了才能进行资源访问,否则会进入阻塞等待,即当前资源不可用
在实现 互斥、同步 时,该如何选择?
结合业务场景进行分析,如果待操作的共享资源是一个整体,比较适合使用 互斥锁+条件变量 的方案,但如果共享资源是多份资源,使用 信号量 就比较方便
其实 「信号量」 的工作机制类似于 买电影票,是一种 预订机制,只要你买到票了,即使你晚点到达电影院,你的位置也始终可用,买到票的本质是将对应的座位进行了预订(详见 Linux进程间通信【消息队列、信号量】)
对于 「信号量」 的第一层理解:申请信号量实际是一种资源预订机制
只要申请 「信号量」 成功了,就一定可以访问临界资源
如果将 「信号量」 实际带入我们之前写的 「生产者消费者模型」 代码中,是不需要进行资源条件判断的,因为 「信号量」本身就已经是资源的计数器了
对于 「信号量」 的第二层理解:使用信号量时,就已经把资源条件判断转化成了信号量的申请行为
比如可以直接这样写
// 生产数据(入队)
void Push(const T& inData)
{
// 申请信号量 P操作
// ...
_queue.push(inData);
// ...
// 释放信号量 V操作
}
3.2、信号量相关操作
有了之前 互斥锁、条件变量 的使用基础,信号量 的接口学习是释放简单的,依旧是只有四个接口:初始化、销毁、申请、释放
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数1:需要初始化的信号量,sem_t
实际就是一个联合体,里面包含了一个 char
数组,以及一个 long int
成员
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
参数2:表示当前信号量的共享状态,传递 0
表示线程间共享,传递 非0
表示进程间共享
参数3:信号量的初始值,可以设置为双元或多元信号量
返回值:初始化成功返回 0
,失败返回 -1
,并设置错误码
销毁信号量
#include <semaphore.h>
int sem_destroy(sem_t *sem);
参数:待销毁的信号量
返回值:成功 0
,失败 -1
, 并设置错误码
申请信号量(等待信号量)
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
主要使用 sem_wait
参数:表示从哪个信号量中申请
返回值:成功返回 0
,失败返回 -1
,并设置错误码
其他两种申请方式分别是:尝试申请,如果没有申请到资源,就会放弃申请;每隔一段时间进行申请,即 timeout
释放信号量(发布信号量)
#include <semaphore.h>
int sem_post(sem_t *sem);
参数:将资源释放到哪个信号量中
返回值:成功返回 0
,失败返回 -1
,并设置错误码
这批接口属于是看一眼就会用,再多看一眼就会爆炸,参数、返回值含义基本都相同,非常容易上手,接下来直接用信号量实现 「生产者消费者模型」
4、基于环形队列实现生产者消费者模型
4.1、环形队列
「生产者消费者模型」 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列,所谓的 环形队列 并非 队列,而是用数组模拟实现的 “队列”, 并且它的 判空、判满 比较特殊
如何让 环形队列 “转” 起来?
- 可以通过取模的方式(可以重复获取一段区间值),确定下标
环形队列 如何判断当前为满、为空?
策略一:多开一个空间,head
、tail
位于同一块空间中时,表示当前队列为空;在进行插入、获取数据时,都是对下一块空间中的数据进行操作,因为多开了一块空间,当待生产的数据落在 head
指向的空间时,就表示已经满了
策略二:参考阻塞队列,搞一个计数器,当计数器的值为 0
时,表示当前为空,当计数器的值为容量时,表示队列为满
这两种策略都可以确保 环形队列 正确判空和判满,至于这里肯定是选择策略二,因为 「信号量」 本身就是一个天然的计数器
在 环形队列 中,生产者 和 消费者 关心的资源不一样:生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据
除非两者相遇,其他情况下生产者、消费者可以并发运行(同时访问环形队列)
两者错位时正常进行生产消费就好了,但两者相遇时需要特殊处理,也就是处理 空、满 两种情况,这就是 环形队列 的运转模式
这里可以引入一个小游戏,来辅助理解 环形队列 的运转模式
假设存在一个大圆桌,上面摆放了一圈空盘子,可以往上面放苹果,也可以取上面的苹果
张三和李四打算展开一场 苹果追逐赛,张三作为 追逐方,目标是移动并获取盘子中的苹果,李四作为 被追逐方,目标是往盘子中放苹果,并向下一个空盘子移动
注意:这里的移动指顺时针移动,不能跳格,这是游戏核心规则
游戏基本规则:
- 当两者相遇,且圆桌中没有苹果时,被追逐方(李四)先跑,对方(张三)阻塞
- 当两者相遇,且圆桌中全是苹果时,追逐方(张三)先跑,对方(李四)阻塞
- 被追逐方(李四)不能套圈追逐方(张三)
- 同时追逐方(张三)也不能超过被追逐方(李四)
ok,现在游戏开始,张三和李四处于同一块空间中(起点),此时两人处于一种特殊情况中,不能同时进行 苹果拾取/苹果放置,由于是刚开始,作为 被追逐方 的李四理应先走,否则两者就都阻塞了(张三追上李四时的情况与刚开始的情况一致)
所以可以得出结论:环形队列为空时,生产者需要先生产数据,消费者阻塞
李四先跑,边跑边放苹果,此时因为张三还没有追上李四,所以张三也是边跑边拾取苹果,两者展开了激烈的追逐赛(高效率)
在追逐过程中,张三李四都能同时对圆桌中的格子进行操作,这是非常高效的,环形队列不为空、不为满时,生产者、消费者可以同时进行并发操作
游戏进行到白热化阶段,法外狂徒张三一不注意摔了一跤,导致拾取苹果的速度不断减慢,李四见状火力全开,不断放置苹果,很快张三就被李四追上了,此时场上已经摆满了苹果,规定一个盘子只能放置一个苹果,李四无法在放置苹果,只能阻塞等待张三进行苹果拾取
场上摆满苹果的情况对应着 环形队列为满的情况,生产者不能再生产,消费者需要进行消费
ok,游戏到这里就可以结束了,因为已经足够总结出 环形队列 的运作模式了
被追逐方(李四) -> 生产者
追逐方(张三) -> 消费者
大圆桌 -> 环形队列
空盘 -> 无数据,可生产
苹果 -> 有数据,可消费
运作模式
- 环形队列为空时:消费者阻塞,只能由生产者进行生产,生产完商品后,消费者可以消费商品
- 环形队列为满时:生产者阻塞,只能由消费者进行消费,消费完商品后,生产者可以生产商品
- 其他情况:生产者、消费者并发运行,各干各的事,互不影响
张三和李四也就只能在 满、空 时相遇了
忘记张三和李四的小游戏,将 环形队列 的运行模式带入 「生产者消费者模型」
可以使用 「信号量」 标识资源的使用情况,但生产者和消费者关注的资源并不相同,所以需要使用两个 「信号量」 来进行操作
- 生产者信号量:标识当前有多少可用空间
- 消费者信号量:标识当前有多少数据
如果说搞两个 条件变量 是 阻塞队列 的精髓,那么搞两个 信号量 就是 环形队列 的精髓,显然,刚开始的时候,生产者信号量初始值为环形队列的大小,消费者信号量初始值为 0
无论是生产者还是消费者,只有申请到自己的 「信号量」 资源后,才进行 生产 / 消费
比如上图中的 pro_sem
就表示 生产者还可以进行 3
次生产,con_sem
表示 消费者还可以消费 5
次
生产者、消费者对于 「信号量」 的申请可以这样理解
// 生产者
void Producer()
{
// 申请信号量(空位 - 1)
sem_wait(&pro_sem);
// 生产商品
// ...
// 释放信号量(商品 + 1)
sem_post(&con_sem);
}
// 消费者
void Consumer()
{
// 申请信号量(商品 - 1)
sem_wait(&con_sem);
// 消费商品
// ...
// 释放信号量(空位 + 1)
sem_post(&pro_sem);
}
生产者和消费者指向同一个位置时保证线程安全,其他情况保证并发度
至于怎么落实到代码中,需要接着往下看
4.2、单生产单消费模型
首先来实现简单点的单生产、单消费版 「生产者消费者模型」
起手先创建一个 环形队列 头文件
创建
RingQueue.hpp
头文件
#pragma once
#include <vector>
#include <semaphore.h>
namespace Yohifo
{
#define DEF_CAP 10
template<class T>
class RingQueue
{
public:
RingQueue(size_t cap = DEF_CAP)
:_cap(cap), _pro_step(0), _con_step(0)
{
_queue.resize(_cap);
// 初始化信号量
sem_init(&_pro_sem, 0, _cap);
sem_init(&_con_sem, 0, 0);
}
~RingQueue()
{
// 销毁信号量
sem_destroy(&_pro_sem);
sem_destroy(&_con_sem);
}
// 生产商品
void Push(const T &inData)
{
// 申请信号量
P(&_pro_sem);
// 生产
_queue[_pro_step++] = inData;
_pro_step %= _cap;
// 释放信号量
V(&_con_sem);
}
// 消费商品
void Pop(T *outData)
{
// 申请信号量
P(&_con_sem);
// 消费
*outData = _queue[_con_step++];
_con_step %= _cap;
// 释放信号量
V(&_pro_sem);
}
private:
void P(sem_t *sem)
{
sem_wait(sem);
}
void V(sem_t *sem)
{
sem_post(sem);
}
private:
std::vector<T> _queue;
size_t _cap;
sem_t _pro_sem;
sem_t _con_sem;
size_t _pro_step; // 生产者下标
size_t _con_step; // 消费者下标
};
}
细节:
- 生产者的信号量初始值为
DEF_CAP
- 消费者的信号量初始值为
0
- 生产者、消费者的起始下标都为
0
在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?
通过两个 信号量,当两个 信号量 都不为 0
时,双方可以并发操作,这是 环形队列 最大的特点;当 生产者信号量为 0
时,生产者陷入阻塞等待,等待消费者消费;同理当 消费者信号量为 0
时,消费者也会阻塞住,在这里阻塞就是 互斥 的体现。当对方完成 生产 / 消费 后,自己会解除阻塞状态,而这就是 同步
目前代码没问题(单生产单消费场景中)
创建
cp.cc
源文件(可以复用之前的测试代码)
这个没啥好说的,直接 copy
之前的代码,稍微修改下类名即可
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "RingQueue.hpp"
void* Producer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 生产者慢一点
sleep(1);
// 1.生产商品(通过某种渠道获取数据)
int num = rand() % 10;
// 2.将商品推送至阻塞队列中
rq->Push(num);
std::cout << "Producer 生产了一个数据: " << num << std::endl;
std::cout << "------------------------" << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 消费者慢一点
// sleep(1);
// 1.从阻塞队列中获取商品
int num;
rq->Pop(&num);
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer 消费了一个数据: " << num << std::endl;
std::cout << "------------------------" << std::endl;
}
pthread_exit((void*)0);
}
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::RingQueue<int>* rq = new Yohifo::RingQueue<int>;
// 创建两个线程(生产者、消费者)
pthread_t pro, con;
pthread_create(&pro, nullptr, Producer, rq);
pthread_create(&con, nullptr, Consumer, rq);
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete rq;
return 0;
}
编译并运行程序。为了使结果更加清晰,分别展示 生产者每隔一秒生产一次、消费者每隔一秒消费一次的结果
生产者每隔一秒生产一次
消费者每隔一秒消费一次
这里的运行结果与 阻塞队列 那边的一模一样,证明当前的 「生产者消费者模型」 没有问题(单生产单消费场景中)
注:如果想要提高并发度,可以增大环形队列的容量
4.3、多生产多消费模型
环形队列 中可不止能放整数,还能放 Task
任务,我们可以把之前的 Task.hpp
引入,重新测试 环形队列
引入
Task.hpp
创建
cp.cc
,在之前的基础上略微修改
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "RingQueue.hpp"
#include "Task.hpp"
void* Producer(void *args)
{
Yohifo::RingQueue<Yohifo::Task<int>>* rq = static_cast<Yohifo::RingQueue<Yohifo::Task<int>>*>(args);
// 运算符集
std::string opers = "+-*/%";
while(true)
{
// 1.生产商品(通过某种渠道获取数据)
// 随机获取两个数(可以改为输入)
int x = rand() % 100;
int y = rand() % 100;
// 随机获取一种运算符
char ops[] = {'+', '-', '*', '/', '%'};
char op = opers[rand() % opers.size()];
// 生产商品需要时间
usleep(10000);
// 2.将商品推送至阻塞队列中
// 创建匿名对象,并 Push 入阻塞队列中
rq->Push(Yohifo::Task<int>(x, y, op));
std::cout << "Producer 生产了: " << x << " " << y << " " << op << " 构成的对象" << std::endl;
std::cout << "----------------------------" << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::RingQueue<Yohifo::Task<int>>* rq = static_cast<Yohifo::RingQueue<Yohifo::Task<int>>*>(args);
while(true)
{
// 1.从阻塞队列中获取商品
Yohifo::Task<int> task;
rq->Pop(&task);
// 进行业务处理
task();
// 消费商品也需要时间
usleep(10000);
std::string ret = task.GetResult();
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer 消费了一个对象,并获得结果: " << ret << std::endl;
std::cout << "===========================" << std::endl;
}
pthread_exit((void*)0);
}
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::RingQueue<Yohifo::Task<int>>* rq = new Yohifo::RingQueue<Yohifo::Task<int>>;
// 创建两个线程(生产者、消费者)
pthread_t pro, con;
pthread_create(&pro, nullptr, Producer, rq);
pthread_create(&con, nullptr, Consumer, rq);
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete rq;
return 0;
}
环形队列版的 CP
模型也能适用于任务执行
接下来可以实现 多生产多消费场景 中的 CP
模型了,多生产多消费无非就是增加了 消费者与消费者、生产者与生产者 间的 互斥 关系,加锁就行了,现在问题是加几把锁?
答案是 两把,因为当前的 生产者和消费者 关注的资源不一样,一个关注剩余空间,另一个关注是否有商品,一把锁是无法锁住两份不同资源的,所以需要给 生产者、消费者 各配一把锁
阻塞队列 中为什么只需要一把锁?
因为阻塞队列中的共享资源是一整个队列,生产者和消费者访问的是同一份资源,所以一把锁就够了
#pragma once
#include <vector>
#include <mutex>
#include <semaphore.h>
namespace Yohifo
{
#define DEF_CAP 10
template<class T>
class RingQueue
{
public:
RingQueue(size_t cap = DEF_CAP)
:_cap(cap), _pro_step(0), _con_step(0)
{
_queue.resize(_cap);
// 初始化信号量
sem_init(&_pro_sem, 0, _cap);
sem_init(&_con_sem, 0, 0);
// 初始化互斥锁
pthread_mutex_init(&_pro_mtx, nullptr);
pthread_mutex_init(&_con_mtx, nullptr);
}
~RingQueue()
{
// 销毁信号量
sem_destroy(&_pro_sem);
sem_destroy(&_con_sem);
// 销毁互斥锁
pthread_mutex_destroy(&_pro_mtx);
pthread_mutex_destroy(&_con_mtx);
}
// 生产商品
void Push(const T &inData)
{
// 申请信号量
P(&_pro_sem);
Lock(&_pro_mtx);
// 生产
_queue[_pro_step++] = inData;
_pro_step %= _cap;
UnLock(&_pro_mtx);
// 释放信号量
V(&_con_sem);
}
// 消费商品
void Pop(T *outData)
{
// 申请信号量
P(&_con_sem);
Lock(&_con_mtx);
// 消费
*outData = _queue[_con_step++];
_con_step %= _cap;
UnLock(&_con_mtx);
// 释放信号量
V(&_pro_sem);
}
private:
void P(sem_t *sem)
{
sem_wait(sem);
}
void V(sem_t *sem)
{
sem_post(sem);
}
void Lock(pthread_mutex_t *lock)
{
pthread_mutex_lock(lock);
}
void UnLock(pthread_mutex_t *lock)
{
pthread_mutex_unlock(lock);
}
private:
std::vector<T> _queue;
size_t _cap;
sem_t _pro_sem;
sem_t _con_sem;
size_t _pro_step; // 生产者下标
size_t _con_step; // 消费者下标
pthread_mutex_t _pro_mtx;
pthread_mutex_t _con_mtx;
};
}
细节: 加锁行为放在信号量申请成功之后,可以提高并发度
在 环形队列 中,可以在申请 「信号量」 前进行加锁,也可以在申请 「信号量」 后进行加锁,这里比较推荐的是 在申请 「信号量」 后加锁
如何理解?
这就好比一群学生在进行座位编排,可以先放一个学生进入教室,再给他确定座位;也可以先给每个人确定好自己的座位(一人一座),然后排队进入教室,对号入座即可。先申请 「信号量」 相当于先确定座位,避免进入教室(加锁)后还得选座位
加锁意味着串行化,一定会降低效率,但因为 「信号量」 的操作是原子的,可以确保线程安全,也就不需要加锁保护;也就是可以并发申请 「信号量」,再串行化访问临界资源
接下来增加 生产者、消费者 的线程数量,并进行测试
修改
cp.cc
// ...
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::RingQueue<Yohifo::Task<int>>* rq = new Yohifo::RingQueue<Yohifo::Task<int>>;
// 创建多个线程(生产者、消费者)
pthread_t pro[10], con[20];
for(int i = 0; i < 10; i++)
pthread_create(pro + i, nullptr, Producer, rq);
for(int i = 0; i < 20; i++)
pthread_create(con + i, nullptr, Consumer, rq);
for(int i = 0; i < 10; i++)
pthread_join(pro[i], nullptr);
for(int i = 0; i < 20; i++)
pthread_join(con[i], nullptr);
delete rq;
return 0;
}
此时一批线程就都运行起来了
阻塞队列 效率已经够高了,那么创造 环形队列 的意义在哪呢?
首先要明白 「生产者消费者模型」 高效的地方从来都不是往缓冲区中放数据、从缓冲区中拿数据
对缓冲区的操作对于计算机说就是小 case
,需要关注的点在于 获取数据和消费数据,这是比较耗费时间的,阻塞队列 至多支持获取 一次数据获取 或 一次数据消费,在代码中的具体体现就是 所有线程都在使用一把锁,并且每次只能 push
、pop
一个数据;而 环形队列 就不一样了,生产者、消费者 可以通过 条件变量 知晓数据获取、数据消费次数,并且由于数据获取、消费操作没有加锁,支持并发,因此效率十分高
环形队列 中允许 N
个生产者线程一起进行数据获取,也允许 N
个消费者线程一起进行数据消费,简单任务处理感知不明显,但复杂任务就不一样了,这就有点像同时下载多份资源,是可以提高效率的
注意: 一起操作并非同时操作,任务开始时间有先后,但都是在进行处理的
环形队列 一定优于 阻塞队列 吗?
答案是否定的,存在即合理,如果 环形队列 能完全碾压 阻塞队列,那么早就不用学习 阻塞队列 了,这两种都属于 「生产者消费者模型」 常见的交易场所,有着各自的适用场景
特征 | 阻塞队列(互斥锁实现) | 环形队列(信号量实现) |
---|---|---|
内部同步机制 | 使用互斥锁或类似的锁机制来实现线程安全 | 使用信号量来实现线程安全 |
阻塞操作 | 支持阻塞操作,当队列为空或已满时,线程可以等待 | 也支持阻塞操作,当队列为空或已满时,线程可以等待 |
数据覆盖 | 通常不会覆盖已有元素,新元素添加时需要等待队列有空间 | 有界的,当队列已满时,添加新元素会覆盖最早的元素 |
实现复杂度 | 实现可能较为复杂,需要处理锁的获取和释放 | 实现相对较简单,需要管理信号量 |
线程安全 | 通过锁来保证线程安全,容易引入死锁问题 | 通过信号量来保证线程安全,不易引入死锁问题 |
添加和删除操作时间复杂度 | O(1)(在队列未满或非空时) | O(1)(常数时间,除非队列已满或为空) |
应用场景 | 多线程数据传递,任务调度,广播通知等 | 循环缓存,数据轮询,循环任务调度等 |
🌆总结
以上就是本次关于 Linux多线程【生产者消费者模型】的全部内容了,在本文中我们首先学习了「生产者消费者模型」的基本概念,然后学习了阻塞队列与环形队列这两种交易场所,并分别用代码进行了实现,当然还学习了信号量这个强大工具。多线程编程中,最重要的是确保线程安全问题,而 「生产者消费者模型」 在确保线程安全的同时提高了并发操作的效率,值得学习和使用
相关文章推荐 Linux多线程 =====:>
【初始多线程】、【线程控制】、【线程互斥与同步】Linux进程信号 ===== :>
【信号产生】、【信号保存】、【信号处理】Linux进程间通信 ===== :>
【消息队列、信号量】、【共享内存】、【命名管道】、【匿名管道】
Linux基础IO ===== :>
【软硬链接与动静态库】、【深入理解文件系统】、【模拟实现C语言文件流】、【重定向及缓冲区理解】、【文件理解与操作】
Linux进程控制 ===== :>
【简易版bash】、【进程程序替换】、【创建、终止、等待】
Linux进程学习 ===== :>
【进程地址】、【环境变量】、【进程状态】、【基本认知】
Linux基础 ===== :>
【gdb】、【git】、【gcc/g++】、【vim】、Linux 权限理解和学习、听说Linux基础指令很多?这里都帮你总结好了