Linux多线程系列三: 生产者消费者模型,信号量,基于阻塞队列和环形队列的这两种生产者消费者代码的实现
- 一.生产者消费者模型的理论
- 1.现实生活中的生产者消费者模型
- 2.多线程当中的生产者消费者模型
- 3.理论
- 二.基于阻塞队列的生产者消费者模型的基础代码
- 1.阻塞队列的介绍
- 2.大致框架
- 1.BlockQueue.hpp
- 2.cp_based_block_queue.cpp
- 3.LockGuard.hpp
- 3.BlockQueue.hpp的编写
- 4.测试代码的编写
- 5.演示
- 三.基于阻塞队列的生产者消费者模型的扩展代码
- 1.传递任务版本
- 1.Task.hpp
- 2.测试代码
- 3.演示
- 2.多生产者多消费者版本
- 1.改测试代码
- 四.生产者消费者模型的再一次理解与阻塞队列版本的优劣
- 1.多执行流解耦
- 2.提高效率
- 3.小结一下
- 五.信号量的深入理解与使用
- 1.理论
- 2.接口介绍
- 六.基于环形队列的单生产者单消费者模型
- 1.思路
- 2.基础代码
- 1.RingQueue.hpp
- 1.结构
- 2.代码
- 3.细节
- 3.扩展代码
- 七.基于环形队列的多生产者多消费者模型
- 1.先申请信号量,后申请锁的原因
- 2.RingQueue代码
- 3.测试
- 八.基于环形队列的生产者消费者模型与基于阻塞队列的生产者消费者模型的对比
学习了同步与互斥之后,我们来学一下应用同步与互斥的一个非常重要的模型:生产者消费者模型
一.生产者消费者模型的理论
1.现实生活中的生产者消费者模型
我们理解一下它们之间的关系
2.多线程当中的生产者消费者模型
3.理论
大家目前就先记住三种关系即可,知道目的和如何实现
然后我们直接开始写代码,写完扩展代码之后在解释原因
二.基于阻塞队列的生产者消费者模型的基础代码
1.阻塞队列的介绍
2.大致框架
1.BlockQueue.hpp
不要忘了条件变量是🔔哦
2.cp_based_block_queue.cpp
c:consumer
p:productor
基于阻塞队列的cp模型
3.LockGuard.hpp
#pragma once
//构造: 申请锁
//析构: 释放锁
class LockGuard
{
public:
LockGuard(pthread_mutex_t* lock)
:pmutex(lock)
{
pthread_mutex_lock(pmutex);
}
~LockGuard()
{
pthread_mutex_unlock(pmutex);
}
private:
pthread_mutex_t* pmutex;
};
这是我们之前利用RAII思想封装的锁的守卫者/聪明的锁,我们先不用它,最后再用(因为它太好用了,好用到不方便解释)
3.BlockQueue.hpp的编写
下面就剩下Push和Pop了
代码:
#pragma once
#include <pthread.h>
#include <queue>
#include "Lock_guard.hpp"
const int defaultSize=5;//默认大小为5
template<class T>
class BlockQueue
{
public:
BlockQueue(int maxSize=defaultSize)//工作: 初始化锁,条件变量,_maxSize
:_maxSize(maxSize)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_c_cond,nullptr);
pthread_cond_init(&_p_cond,nullptr);
}
~BlockQueue()//释放: 初始化锁,条件变量
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
void Push(const T& data)//生产者放数据
{
//1. 生产者和生产者之间互斥,而且队列只能在队尾放数据: 因此需要加锁
pthread_mutex_lock(&_mutex);
//2. 如果队列满了,生产者需要去自己的条件变量下排队,等待消费者唤醒
//if(Full()): 无法防止伪唤醒带来的bug
while(Full())//能够防止伪唤醒带来的bug -> 代码的鲁棒性强
{
pthread_cond_wait(&_p_cond,&_mutex);
}
//3. 条件满足,直接放数据即可
_q.push(data);
//4. 大家可以自定义生产多少数据之后再唤醒消费者,我这里先暂且: 只要有一个数据就唤醒消费者
pthread_cond_signal(&_c_cond);//摇消费者的铃铛,唤醒一个消费者
//5. Push完成 -> 释放锁
pthread_mutex_unlock(&_mutex);
}
void Pop(T& data)//消费者拿数据(跟Push异曲同工之妙)
{
//1. 消费者跟消费者之间互斥,且队列只能从队头出数据: 因此需要加锁
pthread_mutex_lock(&_mutex);
//2. 判断是否空
while(Empty())
{
pthread_cond_wait(&_c_cond,&_mutex);//去自己的条件变量那里排队,等着生产者唤醒
}
//3. 条件满足,直接拿数据即可
data=_q.front();
_q.pop();
//4. 只要拿了一个数据就唤醒生产者,当然大家可以自定义
pthread_cond_signal(&_p_cond);//摇生产者的铃铛,唤醒一个生产者
//5. Pop完成 -> 释放锁
pthread_mutex_unlock(&_mutex);
}
bool Full() const//判满
{
return _q.size()==_maxSize;//判断队列中的数据个数是否==_maxSize即可
}
bool Empty() const//判空
{
return _q.empty();//复用即可
}
private:
queue<T> _q;//内部封装的STL的queue
pthread_mutex_t _mutex;
//一把互斥锁即可 (因为生产者之间互斥,消费者之间互斥,生产者和消费者之间互斥,因此阻塞队列在同一时刻只允许一个线程进行访问!!)
pthread_cond_t _p_cond;//productor生产者的条件变量
pthread_cond_t _c_cond;//consumer消费者的条件变量
int _maxSize;//阻塞队列的大小(因为阻塞队列需要能够判满)
};
4.测试代码的编写
代码:
#include <iostream>
#include <unistd.h>
using namespace std;
#include "BlockQueue.hpp"
#include <random>
#include <chrono>
// 生成指定范围内的随机整数(不用管这个)
int generateRandomInt(int min, int max) {
// 使用基于当前时间的种子
static random_device rd;
static mt19937 gen(rd());
// 定义随机数分布
uniform_int_distribution<> dis(min, max);
// 生成随机数
return dis(gen);
}
void* productor_func(void* arg)//生产者:放数据
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(arg);
while(true)
{
//1. 生产数据
int data=generateRandomInt(1,9);
//2. 放数据
bq->Push(data);
//3. 打印数据
cout<<"productor_func put data: "<<data<<endl;
//4. 休眠/不休眠随意
sleep(1);
}
}
void* consumer_func(void* arg)//消费者:拿数据
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(arg);
while(true)
{
//1. 拿数据
int data=-1;
bq->Pop(data);
//2. 处理数据,我们就先暂且打印了
cout<<"consumer_func get data: "<<data<<endl;
//3. 休眠/不休眠随意
}
}
int main()
{
srand(time(nullptr));
BlockQueue<int>* bq=new BlockQueue<int>;
pthread_t consumer_id,productor_id;
pthread_create(&consumer_id,nullptr,consumer_func,bq);
pthread_create(&productor_id,nullptr,productor_func,bq);
pthread_join(consumer_id,nullptr);
pthread_join(productor_id,nullptr);
delete bq;
return 0;
}
5.演示
情况1: 生产者不休眠,消费者休眠
情况2: 消费者不休眠,生产者休眠
看到了生产者和消费者的确具有同步的关系
三.基于阻塞队列的生产者消费者模型的扩展代码
1.传递任务版本
刚才的时候我们的阻塞队列当中放的是纯数据,我们可不可以放任务呢?
就像是我们进程池当中主进程向其他进程发送任务让其他线程执行似的
所以我们创建一个Task.hpp
1.Task.hpp
代码:
#pragma once
#include <unordered_map>
#include <functional>
//我们就模拟数学运算吧: + - * / % & | && ||
//因为~和!是单操作符,所以我们就不搞这两个操作符了
enum State
{
believable = 0,//可信
division_by_zero,//除0
mod_by_zero,//模0
unknown,//非法操作符
};
vector<string> opers={"+","-","*","/","%","&","|","&&","||"};
class Task
{
public:
Task()=default;
Task(int left_operand,int right_operand,string op)
:_left_operand(left_operand),_right_operand(right_operand),_op(op){}
string DebugForProductor() const
{
return to_string(_left_operand)+_op+to_string(_right_operand)+" = ?";
}
string DebugForConsumer() const
{
return to_string(_left_operand)+_op+to_string(_right_operand)+" = "+to_string(_ans)+"["+_stateMap[_state]+"]";
}
//进行操作运算
void operator()()
{
if(_opMap.count(_op)==0)//操作符非法
{
_state=unknown;
return;
}
_ans=_opMap[_op](_left_operand,_right_operand,_state);
}
private:
int _left_operand;//左右操作数
int _right_operand;
string _op;//运算符
int _ans;//答案
State _state=believable;//答案的状态
static unordered_map<string,function<int(int,int,State&)>> _opMap;//操作表
static unordered_map<State,string> _stateMap;//状态表
};
unordered_map<string,function<int(int,int,State&)>> Task::_opMap={
{"+",[](int a,int b,State& s) {return a+b;}},
{"-",[](int a,int b,State& s) {return a-b;}},
{"*",[](int a,int b,State& s) {return a*b;}},
{"&",[](int a,int b,State& s) {return a&b;}},
{"|",[](int a,int b,State& s) {return a|b;}},
{"&&",[](int a,int b,State& s) {return a&&b;}},
{"||",[](int a,int b,State& s) {return a||b;}},
{"/",[](int a,int b,State& s) {
if(b==0) {s=division_by_zero; return 0;}
else return a/b;}},
{"%",[](int a,int b,State& s) {
if(b==0) {s=mod_by_zero; return 0;}
else return a%b;}}
};
unordered_map<State,string> Task::_stateMap={
{believable,"believable"},{division_by_zero,"division_by_zero"},{mod_by_zero,"mod_by_zero"},{unknown,"unknown"}
};
我们这份代码的好处是方便扩展,坏处是效率有些慢,没有疯狂if else或者switch case快
2.测试代码
3.演示
运行成功
2.多生产者多消费者版本
下面我们把它"改"成多生产多消费,这里加""是因为我们实现的时候就已经确保生产者生产者互斥,消费者消费者互斥,生产者消费者互斥了,所以根本无需改动我们的阻塞队列
但是我们要改测试代码了
因为有多生产,多消费,所以我们搞3生产者,2消费者,给它们做个编号,这5个线程共用同一个阻塞队列
因此我们封装一下阻塞队列,把阻塞队列和编号/名字封装一下,并且用一下我们的lockguard
1.改测试代码
代码:
#include <iostream>
#include <unistd.h>
#include <vector>
using namespace std;
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <random>
#include <chrono>
// 生成指定范围内的随机整数
int generateRandomInt(int min, int max) {
// 使用基于当前时间的种子
static random_device rd;
static mt19937 gen(rd());
// 定义随机数分布
uniform_int_distribution<> dis(min, max);
// 生成随机数
return dis(gen);
}
template<class T>
struct ThreadData
{
ThreadData(const string& name,BlockQueue<T>* bq)
:_name(name),_bq(bq){}
string _name;
BlockQueue<T>* _bq;
};
pthread_mutex_t print_mutex=PTHREAD_MUTEX_INITIALIZER;
void* productor_func(void* arg)//生产者:放数据
{
ThreadData<Task>* td=static_cast<ThreadData<Task>*>(arg);
while(true)
{
//1. 生产数据
int ldata=generateRandomInt(0,9),rdata=generateRandomInt(0,9);
int i=generateRandomInt(0,opers.size()-1);
Task t(ldata,rdata,opers[i]);
//2. 放数据
td->_bq->Push(t);
//3. 打印数据
LockGuard lockguard(&print_mutex);
cout<<t.DebugForProductor()<<" # # : "<<td->_name<<endl;
//4. 每放一次数据 -> 休眠100ms -> 0.1s
usleep(1000000);
}
}
void* consumer_func(void* arg)//消费者:拿数据
{
ThreadData<Task>* td=static_cast<ThreadData<Task>*>(arg);
while(true)
{
//1. 拿数据
Task t;
td->_bq->Pop(t);
//2. 处理数据,
t();
LockGuard lockguard(&print_mutex);
cout<<t.DebugForConsumer()<<" # # : "<<td->_name<<endl;
//3. 不休眠,疯狂拿数据
}
}
int main()
{
BlockQueue<Task>* bq=new BlockQueue<Task>;
vector<pthread_t> v(5);
vector<ThreadData<Task>*> del;
for(int i=0;i<5;i++)
{
ThreadData<Task>* td=new ThreadData<Task>("thread - "+to_string(i),bq);
if(i<3)
{
pthread_create(&v[i],nullptr,productor_func,td);
}
else
{
pthread_create(&v[i],nullptr,consumer_func,td);
}
del.push_back(td);
}
for(auto& e:v) pthread_join(e,nullptr);
delete bq;
for(auto& e:del) delete e;
return 0;
}
演示:
当然,大家可以自定义生产者生产多少数据之后再唤醒消费者,消费者消费多少数据之后在唤醒生产者
四.生产者消费者模型的再一次理解与阻塞队列版本的优劣
我们解释一下生产者消费者模型的优点:
1.多执行流解耦
2.提高效率
3.小结一下
生产者消费者模型:
通过交易场所这个大的临界资源来存放交易数据实现了多执行流之间的解耦,
从而使得生产者创建数据和消费者处理数据的工作能够跟其他线程实现并发执行,从而提高效率
只不过因为阻塞队列是把整个队列当作一个整体,所以阻塞队列在任意时刻只允许一个线程进行访问,其他线程必须正在阻塞
这个操作降低了阻塞队列的一点点效率,但是跟阻塞队列带来的优势相比,在权衡之下,依旧是优点大大高于不足
五.信号量的深入理解与使用
1.理论
我们之前在介绍System V版本的进程间通信的时候,介绍了信号量的理论,并且用信号量实现了共享内存的协同机制
下面我们稍微复习总结一下信号量的理论
还有一点:
信号量本身就具有互斥和同步的功能!!
而锁只有互斥的功能,想要同步,必须配合条件变量等等机制才能实现同步
记住: 锁:🔒,条件变量:🔔,信号量:🔢(计数器)
2.接口介绍
相比于System V的接口来说,pthread库当中信号量的接口就简洁很多
我们就只需要用这4个接口即可,下面直接在基于环形队列的生产者消费者模型的代码当中用一下信号量了
因为环形队列的生产者和消费者之间的互斥可以用信号量🔢来维护,所以我们用一下环形队列这个数据结构作为交易场所
又因为生产者和生产者,消费者和消费者之间也是互斥的,而它们之间的互斥怎么保证呢?
这点比起阻塞队列的统统挂锁🔒要难以理解一点,所以我们先实现单生产者单消费者模型,然后再改成多生产者多消费者模型
六.基于环形队列的单生产者单消费者模型
1.思路
这里我们用了信号量之后根本就不需要条件变量了,因为
队列为空时: Pop会阻塞消费者,但是当生产者Push数据之后,sem_data就++了,因此Pop阻塞的消费者就能够申请到sem_data了
同理,队列为满时: Push会阻塞生产者,但是当消费者Pop数据之后,sem_space就++了,因此Push阻塞的生产者就能够申请到sem_space了
而且环形队列的大小就是vector一开始初始化的size().因此也无需我们在设置一个变量了
2.基础代码
刚写完阻塞队列的生产者消费者模型,那单生产单消费的环形队列没啥大区别,这里直接用ThreadData了
用一下类似于适配器模式的样子,你给我传什么阻塞队列/环形队列/xxx容器/yyy容器,无所谓,我都给你绑定一个字符串
1.RingQueue.hpp
1.结构
2.代码
这里就先不给出源码了,因为这个场景对多生产多消费并不适用,为何?
那么push的时候先加锁还是先申请信号量呢??
这里比较不太好理解,我们放到改成多生产多消费的时候再谈,因为现在有一个更重要的发现需要我们介绍
3.细节
需要实现同步+互斥的时候
锁必须配合条件变量进行使用(不考虑锁能配合信号量一起使用)
而有时信号量可以无需配合条件变量进行使用
因此信号量才被称为"对资源的预定机制",因为这种情况下它不知不觉就自动实现了同步
因此信号量本身就具有互斥和同步的功能!!
而锁只有互斥的功能,想要同步,必须配合条件变量等等机制才能实现同步
3.扩展代码
下面直接用我们的Task.hpp,啥也不用改,拿过头文件来直接用就行
跟阻塞队列的一样,没啥好解释的
七.基于环形队列的多生产者多消费者模型
1.先申请信号量,后申请锁的原因
刚才我们说了,Push和Pop想要改成多生产者多消费者一定要加锁,那么先加锁还是先申请信号量呢?
代码的正确性上讲,其实是都可以,但是效率上是有区别的
申请信号量🔢: 本质是解决生产者和消费者之间的互斥(解决座位数目(图书馆资源)和读者需求之间的互斥)
申请锁🔒: 本质是解决生产者和生产者之间的互斥,消费者和消费者之间的互斥
因此申请信号量是解决外部矛盾,而申请锁是解决内部矛盾
而对于同时面临内外的非常严重的问题时: 解决矛盾一定是先解决外部矛盾,后解决内部矛盾
2.RingQueue代码
直接用我们的LockGuard秒了它
#pragma once
#include <semaphore.h>
const int defaultSize = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int size = defaultSize)
: _p_index(0), _c_index(0)
{
_arr.resize(size);
sem_init(&_sem_space, 0, size); //_space空间个数初始值为size
sem_init(&_sem_data, 0, 0); //_data数据个数初始值:0
pthread_mutex_init(&_p_mutex,nullptr);
pthread_mutex_init(&_c_mutex,nullptr);
}
~RingQueue()
{
sem_destroy(&_sem_space);
sem_destroy(&_sem_data);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
// 我们封装一个P操作和一个V操作,方便使用
void P(sem_t &sem) // P -> wait --
{
sem_wait(&sem);
}
void V(sem_t &sem) // V -> post ++
{
sem_post(&sem);
}
void Push(const T &data)
{
// 1. 申请信号量
P(_sem_space);
// 2. 放数据即可
{
LockGuard lockguard(&_p_mutex);
_arr[_p_index] = data;
_p_index = (_p_index + 1) % _arr.size();
}
// 3. 释放信号量
V(_sem_data);
}
void Pop(T &data)
{
// 1. 申请信号量
P(_sem_data);
// 2. 放数据即可
{
LockGuard lockguard(&_c_mutex);
data = _arr[_c_index];
_c_index = (_c_index + 1) % _arr.size();
}
// 3. 释放信号量
V(_sem_space);
}
private:
vector<T> _arr; // 环形队列底层容器,环形队列大小就是_arr.size()
sem_t _sem_space; // 空间信号量
sem_t _sem_data; // 数据信号量
int _p_index; // 生产者放数据的下标
int _c_index; // 消费者拿数据的下标
pthread_mutex_t _p_mutex; // 解决生产者内部矛盾
pthread_mutex_t _c_mutex; // 解决消费者内部矛盾
};
3.测试
直接上测试代码,2个消费者,3个生产者,给cout加锁,走起
#include <iostream>
using namespace std;
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include "Lock_guard.hpp"
#include "RingQueue.hpp"
#include "Task.hpp"
#include <random>
#include <chrono>
// 生成指定范围内的随机整数
int generateRandomInt(int min, int max)
{
// 使用基于当前时间的种子
static random_device rd;
static mt19937 gen(rd());
// 定义随机数分布
uniform_int_distribution<> dis(min, max);
// 生成随机数
return dis(gen);
}
// 直接搞成类似于容器适配器模式了
template <class Container>
struct ThreadData
{
ThreadData(const string &name, Container *con)
: _name(name), _con(con) {}
string _name;
Container *_con;
};
pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER;
void *consumer_func(void *arg)
{
ThreadData<RingQueue<Task>> *td = static_cast<ThreadData<RingQueue<Task>> *>(arg);
while (true)
{
int Ldata = generateRandomInt(0, 9), Rdata = generateRandomInt(0, 9), opi = generateRandomInt(0, opers.size() - 1);
Task t(Ldata, Rdata, opers[opi]);
td->_con->Push(t);
LockGuard lockguard(&print_mutex);//改成多生产者多消费者时再给打印加锁
cout << t.DebugForProductor() << " " << td->_name << endl;
sleep(1); // 生产者休眠1s
}
}
void *productor_func(void *arg)
{
ThreadData<RingQueue<Task>> *td = static_cast<ThreadData<RingQueue<Task>> *>(arg);
while (true)
{
Task t;
td->_con->Pop(t);
LockGuard lockguard(&print_mutex);//改成多生产者多消费者时再给打印加锁
t();
cout << t.DebugForConsumer() << " " << td->_name << endl;
}
}
int main()
{
RingQueue<Task> *rq = new RingQueue<Task>;
vector<pthread_t> v(5);
vector<ThreadData<RingQueue<Task>>*> delv;
//先生产者
for(int i=0;i<3;i++)
{
ThreadData<RingQueue<Task>> *td = new ThreadData<RingQueue<Task>>("thread - p"+to_string(i+1), rq);
delv.push_back(td);
pthread_create(&v[i],nullptr,productor_func,td);
}
//后消费者
for(int i=0;i<2;i++)
{
ThreadData<RingQueue<Task>> *td = new ThreadData<RingQueue<Task>>("thread - c"+to_string(i+1), rq);
delv.push_back(td);
pthread_create(&v[i+3],nullptr,consumer_func,td);
}
for(auto& e:v) pthread_join(e,nullptr);
delete rq;
for(auto& e:delv) delete e;
return 0;
}
多生产多消费测试的修改跟阻塞队列的差不多,唯一最大的变化就是这里给cout也加锁了
八.基于环形队列的生产者消费者模型与基于阻塞队列的生产者消费者模型的对比
环形队列的生产者消费者模型通过将整个交易场所划分为若干个区域,
从而将使得生产者和消费者可以在一定条件下实现并发访问环形队列,从而相比于阻塞队列来说在这一点上提高了效率
但是也不能单纯地下定义说环形队列就是比阻塞队列好
别忘了: 阻塞队列还能够由我们自定义生产者生产多少数据之后再唤醒消费者,消费者消费多少数据之后在唤醒生产者的
条件变量允许开发者根据特定的条件来决定何时唤醒线程,而信号量则通过控制资源的并发访问量来实现同步
因此阻塞队列中互斥锁配合条件变量能够使得代码更加易于控制和变化
所以两种方法各有千秋,使用哪种看具体需求和场景而定
以上就是Linux多线程系列三: 生产者消费者模型,信号量使用,基于阻塞队列和环形队列的这两种生产者消费者代码的实现的全部内容,希望能对大家所有帮助!!!