文章目录
- 1、生产者消费者模型
- 1、基于BlockingQueue的生产者消费者模型
- 2、对模型全面的认识
- 3、多生产者多消费者
- 2、信号量
- 1、POSIX信号量
- 2、基于环形队列的生产消费模型
- 3、多生产者多消费者
- 3、多生产者多消费者模型的意义
1、生产者消费者模型
顾名思义,就像是供货商,超市,顾客一样。顾客有多种多样的需求,他们不能直接去供货商那里购买,供货商有自己的规定,为了更好地满足消费者需求,供货商给超市物品,由超市来销售,消费者就可以去超市来买。供货商就是生产者,顾客就是消费者,因为有超市的存在,生产者和消费者的步调可以不一致,供货商即使不供货,超市也有存货卖给顾客,这样忙闲不均,效率也高。
对应到计算机中,生成者和消费者都是线程,超市就是一种特定的缓冲区,缓冲区可以有多种结构。生成者有有用的数据,通过缓冲区给到消费者。这貌似很像之前的通信,但又不是通信。这个模型成立的前提是缓冲区必须先被所有线程看到,也就是说,这个缓冲区是一个被多线程并发访问的公共区域,那么多线程就一定要保护共享资源的安全,并且维护线程互斥与同步的关系。如何维护?
生成者消费者模型一定有三种关系,生产者和生产者,消费者和消费者,生产者和消费者。生生之间是互斥关系,一个生产者往一块空间塞入了数据,那么其他生产者就不能再往这个空间塞入数据。生产者和消费者,顾客会去超市问有没有自己要的商品,如果没有,那就只能等一段时间再来询问,但是超市如果告诉顾客什么时候有,那么顾客就可以挂起等待,直到那个时间再来购买,同理,如果超市不告诉供货商什么时候什么货物缺了,那么供货商也无法确定要不要供货,这里可以看出,整个模型需要有同步才能高效地运行起来;不只有同步关系,顾客和供货商买和供货时应当是不一样的时间点去做,这也就是互斥关系。消费者和消费者之间,如果两个顾客要买同一根商品,那就冲突了,所以有互斥关系。
模型有两个角色,生产者,消费者;有一个交易场所,通常是缓冲区。
1、基于BlockingQueue的生产者消费者模型
BlockingQueue是阻塞队列。队列满了,放入数据的线程就不让放了,队列空了,拿数据的线程就不让拿了,和之前的管道一样。我们创建一个BlockQueue.hpp文件,main.cc文件包含这个hpp文件,一个makefile。
先建立一个框架
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
template <class T>
class BlockQueue
{
};
//main.cc
#include "BlockQueue.hpp"
void* consumer(void* args)
{
}
void* productor(void* args)
{
}
int main()
{
//单生产和单消费
pthread_t c, p;
pthread_create(&c, nullptr, consumer, nullptr);
pthread_create(&p, nullptr, productor, nullptr);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
现在这是两个独立线程,如何让这两个线程看到同一个缓冲区?hpp文件里定义一个BlockQueue类,我们先让它的模板参数是int。
void* consumer(void* args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
}
void* productor(void* args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>();
//单生产和单消费
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
数据呢,实际生活中,这个数据多种多样,这里我们伪造简单的随机数来充当数据。
void* consumer(void* args)
{
sleep(1);
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
int data = 0;
//1、从阻塞队列中获取数据
bq->pop(&data);
//2、结合某种业务逻辑,处理数据
cout << "consumer data" << data << endl;
}
}
void* productor(void* args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
sleep(1);
//1、先通过某种渠道获取数据
int data = rand() % 10 + 1;//也就是生成1-10
//2、将数据推送到阻塞队列 —— 生产过程
bq->push(data);
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpipd());
BlockQueue<int> *bq = new BlockQueue<int>();
//单生产和单消费
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
然后开始写阻塞队列这个类的框架。
const int gcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gcap):_cap(gcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_productorCond, nullptr);
}
void push(const T &in)
{}
void pop(T* out)
{}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
queue<T> _q;
int _cap;//容量上限
pthread_mutex_t _mutex;
pthread_cond_t _consumerCond;//如果缓冲区为空,消费者等待,这个变量就是是否为空的条件变量
pthread_cond_t _productorCond;//如果缓冲区为满,生产者等待,这个变量就是是否为满的条件变量
};
push和pop
bool isFull() {return _q.size() == _cap; }
bool isEmpty() {return _q.empty(); }
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
if(isFull())//1、只能在临界区内部判断临界资源是否就绪,注定了当前一定持有锁
{
//2、要让线程进行休眠等待,就不能持有锁。锁只有一个,锁如果放在了阻塞线程中,那么谁也申请不了锁,就死锁了
//3、wait接口就得需要传锁,来释放锁
pthread_cond_wait(&_productorCond, &_mutex);//休眠结束后,应当从哪里继续执行?
//4、从系统角度看,休眠就是把线程给切走了,当线程醒来时,应当从临界区内部继续执行,因为线程是在临界区被切走的
//5、被唤醒时,wait函数需要重新申请锁,申请成功了才会返回,然后线程继续执行余下的代码
}
//走到这里,说明没满,就可以生产
_q.push(in);
//加策略,策略决定什么时候去唤醒,这里就不加了,直接唤醒
pthread_cond_signal(&_consumerCond);//生产者知道它自己放入了数据,所以缓冲区一定不为空,所以唤醒消费者
pthread_mutex_unlock(&_mutex);
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
if(ifEmpty())
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
*out = _q.front();
_q.pop();
//加策略,策略决定什么时候去唤醒,这里就不加了,直接唤醒
pthread_cond_signal(&_productorCond);//消费者知道它刚拿走一个数据,那么那个位置一定是空,就可以唤醒生产者生产
pthread_mutex_unlock(&_mutex);
}
到了这里,基本的生产者消费者模型就已经创建好了,现在继续完善这个模型
有没有可能误唤醒一个线程?在push里,如果是多个生产者,从wait那里被唤醒然后push数据,就可能溢出,因为原本剩下的空间可能不够这些线程去push;或者一个生产者刚wait就被消费者给唤醒了,比如不符合消费者给的策略,条件,或者消费者用了broadcast来唤醒所有生产者,生产者就被误唤醒或者伪唤醒,它会继续向后执行,去push,就会出问题。
所以我们一定要保证任何时候都要符合条件才能生产。那么push那里的if判断换成while,即使被误唤醒,这个生产者也能继续循环,判断,然后再次wait。同理pop那里也要用while循环。
2、对模型全面的认识
之前已经写过,这个模型是高效的,忙闲不均的,那么体现在哪里?生产的时候就不能消费,消费的时候也不能生产,这两个是串行的,那么如何体现高效?我们不应该只认为生产者往队列里放数据,消费者从队列里拿数据,生产者有数据来源,消费者有对数据的处理方法,生产者放入数据时,不妨碍消费者处理拿到的数据。消费拿数据时,不影响生产者从其他地方获取数据,所以生产者和消费者可以并行,这就是高效所在。阻塞队列不只放整数字符串之类的,它还可以放入对象,任务等等。我们改造一下代码。
新建一个task.hpp文件
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task()
{}
Task(int x, int y, char op):_x(x), _y(y), _op(op), _result(0), _exitCode(0)
{}
void operator()()
{
switch(_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if(_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if(_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) + _op + to_string(_y) + "=";
}
std::string formatRes()
{
return std::string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
}
main.cc里
void* consumer(void* args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t;
//1、从阻塞队列中获取数据
bq->pop(&t);
t();
//2、结合某种业务逻辑,处理数据
cout << "consumer data" << t.formatArg() <<t.formatRes() << endl;
}
}
void* productor(void* args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
string opers = "+-*/%";
while(true)
{
sleep(1);
//1、先通过某种渠道获取数据
int x = rand() % 20 + 1;//也就是生成1-10
int y = rand() % 10 + 1;
char op = opers[rand() % opers.size()];
//2、将数据推送到阻塞队列 —— 生产过程
Task t(x, y, op)
bq->push(t);
cout << "productor Task: " << t.formatArg() << "?" << endl;
}
}
再次执行所有代码就可以看到效果了,这只是一个简单的任务发送,像网络,通信等都可以作为数据发过去。
3、多生产者多消费者
如果只是在main.cc中
int main()
{
srand((uint64_t)time(nullptr) ^ getpipd());
//BlockQueue<int> *bq = new BlockQueue<int>();
BlockQueue<Task> *bq = new BlockQueue<Task>();
//单生产和单消费
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
把c和p换成两个数组,循环创建和join,可不可以实现多生产多消费了?实际是可以的。为什么?
我们只用了一把锁,完成了生产者和消费者之间的互斥和同步关系,而生生和费费之间都是互斥关系,也就都能满足;多生产多消费高效的意义在于生产者拿到数据和消费者处理数据。
为什么只用一把锁?根本原因在于生产者和消费者访问的是同一个阻塞队列,它们三个被看作一个整体。
2、信号量
信号量是一个计数器,描述临界资源的数目;信号量需要进行PV操作,P相当于–操作,V相当于++操作,这两个操作是原子的。
如果临界资源只有一个,那么信号量就可以设为1,一申请成功,信号量减为1变成0,从临界区出来后信号量又变回1,这时候这个信号量也叫二元信号量,也就是互斥锁。
一个资源被分成多个小资源,多个线程就可以通过访问不同的小资源来实现对这个资源的并发访问,这时候的信号量就是多元信号量。每一个线程,在访问对应的资源的时候,先申请信号量,申请成功,表示该线程允许使用该资源,不成功就无法使用该资源。
信号量是一种资源的预订机制,有资源不用,只要不退出,其它线程也没法访问这个资源。
信号量既然是资源的计数器,申请成功就表明资源可用,申请失败就表明资源不可用,本质上是把判断转化成信号量的申请行为。信号量是在访问临界区和申请锁之前进行的。
1、POSIX信号量
POSIX和System V都用于同步操作,达到无冲突地访问共享资源的目的,但POSIX还可用于线程同步。
2、基于环形队列的生产消费模型
用数组来模拟环形队列。用i %= N,当到了数组尾部,再次i++,i就等于N了,那么%=N就会让i变为0,也就又来到了数组头。环形队列中,head指向头部,tail指向尾部,先放数据,再tail++。生产者向tail中push数据,消费者向head中pop数据。生产者关心空间,消费者关心数据。
环形队列中,只要访问不同的区域,生产和消费就可以同时进行,那么生产者和消费者什么时候会访问同一个区域?这其实就是一个追及问题,第一种情况是刚开始没有数据的时候,第二种情况是队列里满数据,tail又来到了队列头部,也就是head处。第一种情况要让生产者先行,第二种情况要让消费者先行。这两种情况分别对应队列为空和为满,其它情况下生产者和消费者可以并发执行。
我们的代码要保证队列为空和为满时有对应的处理,不能让消费者超过生产者,不能让生产者套圈消费者。
用计算机语言如何描述这些情况?我们要给生产者定义一个信号量sem_room,初始值为N,消费者的信号量sem_data为0;两者一开始都需要申请资源,生产者申请空间信号量,消费者申请数据信号量,这样也就保证了生产者会先行,因为生产者信号量不为空;生产者进行生产活动,指向下一个位置,它不归还空间资源,不需要改变空间信号量,只把数据信号量+1即可,消费者就会检测到自己申请成功了,那就会进行消费活动,消费者会拿走数据,但是消费者不改变数据信号量,而是把空间信号量-1,然后再走到下一个位置。两者都申请自己关心的信号量,释放对方的信号量,这样也就能满足整个环形队列的所有规则。
接下来写代码
创建Main.cc,RingQueue.hpp和Makefile文件。两个主要文件先写框架。
Makefile
ringqueue:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ringqueue
Main.cc
#include "RingQueue.cc"
using namespace std;
void* consumerRoutine(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = 0;
rq->pop(&data);
cout << "consumer done: " << datat << endl;
sleep(1);
}
}
void* productorRoutine(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = rand() % 10 + 1;
rq->push(data);
cout << "productor done: " << data << endl;
sleep(1);
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<int> *rq = new RingQueue<int>();
//单生产单消费
pthread_t c, p;
pthread_create(&c, nullptr, consumerRoutine, nullptr);
pthread_create(&p, nullptr, productorRoutine, nullptr);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
我们要申请信号量,需要用到头文件< semaphore.h >,整体的思路和前面的生产消费模型其实一样
RingQueue.hpp文件
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <semaphore.h>
static const int N = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &s) {sem_wait(&s); }
void V(sme_t &s) {sem_post(&s); }//发布信号量的接口
public:
RingQueue(int num = N): _ring(num), _cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
_c_step = _p_step = 0;
}
void push(const T& in)
{
//生产
P(_space_sem);//P操作,生产者需要看看空间信号量是否不为空,不空才可以继续
//不需要判断,一定有对应的空间资源给我
//因为信号量本身就是描述临界资源的,它可以在临界区外去申请,P成功就说明可以继续执行了
_ring[_p_step] = in;//_p_step是生产者的位置
++_p_step;
_p_step %= _cap;
//V操作
V(_data_sem);//一个数据放进去了,那么数据信号量就增加
}
void pop(T* out)
{
//消费
P(_data_sem);//P操作,消费者需要看看数据信号量是否不为空,不空才可以继续
*out = _ring[_c_step];//_c_step是消费者的位置
++_c_step;
_c_step %= _cap;
V(_space_sem);//一个数据被拿走,消费者往后走一步,空间信号量就减少
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
private:
std::vector<T> _ring;
int _cap;//环形队列大小
sem_t _data_sem;//只有消费者关心
sem_t _space_sem//只有生产者关心
int _c_step;//消费者位置
int _p_step;//生产者位置
}
生产者和消费者谁先运行不一定,但在hpp文件中,我们已经用代码确定了生成者先行。Main.cc文件中的生产消费函数中sleep用来控制谁速度更快,其实哪一个慢都会让另一个也变慢,这是因为两者在同步。
除了传int,环形队列也可以传类,可以用上面阻塞队列写法的task.hpp,在operator()()最后写上usleep(100000)来模拟任务时长,要用头文件< unistd.h >,Main.cc中这样改
#include "RingQueue.cc"
#include "task.hpp"
using namespace std;
const char* ops = "+-*/%";
void* consumerRoutine(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务: " << t.formatArg() <<t.formatRes() << endl;
}
}
void* productorRoutine(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArg() << "?" << endl;
}
}
生产和消费的两个动作都需要消耗时间,所以task中处理任务的operator()()有个usleep(100000)。
3、多生产者多消费者
上面的代码维护的是生产者和消费者之间的关系,如果改成多生产多消费,上面的代码还不足以维护生产与生产,消费与消费之间的关系,所以我们要加锁,由于是生产和生产之间,消费和消费之间,所以需要两把锁。当然,生产者和消费者也得换成数组。加锁应当在申请信号量之后,这样的话其它线程都可以去申请信号量,分配资源,如果自己可以去处理资源,再去申请锁,进行操作,而如果加锁在申请信号量之前,那么其它线程只能挂起等待,效率更低。
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <semaphore.h>
#include <string>
#include <cstring>
static const int N = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &s) {sem_wait(&s); }
void V(sme_t &s) {sem_post(&s); }//发布信号量的接口
void Lock(pthread_mutex_t& m) {pthread_mutex_lock(&m); }
void Unlock(pthread_mutex_t& m) {pthread_mutex_unlock(&m); }
public:
RingQueue(int num = N): _ring(num), _cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
//生产
void push(const T& in)
{
P(_space_sem);//P操作,生产者需要看看空间信号量是否不为空,不空才可以继续
Lock(_p_mutex);
//不需要判断,一定有对应的空间资源给我
//因为信号量本身就是描述临界资源的,它可以在临界区外去申请,P成功就说明可以继续执行了
_ring[_p_step] = in;//_p_step是生产者的位置
++_p_step;
_p_step %= _cap;
Unlock(_p_mutex);
//V操作
V(_data_sem);//一个数据放进去了,那么数据信号量就增加
}
//消费
void pop(T* out)
{
P(_data_sem);//P操作,消费者需要看看数据信号量是否不为空,不空才可以继续
Lock(_c_mutex);
*out = _ring[_c_step];//_c_step是消费者的位置
++_c_step;
_c_step %= _cap;
Unlock(_p_mutex);
V(_space_sem);//一个数据被拿走,消费者往后走一步,空间信号量就减少
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);]
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ring;
int _cap;//环形队列大小
sem_t _data_sem;//只有消费者关心
sem_t _space_sem//只有生产者关心
int _c_step;//消费者位置
int _p_step;//生产者位置
pthread_mutex_t _c_mutex;//消费者之间的锁
pthread_mutex_t _p_mutex;//生产者之间的锁
}
3、多生产者多消费者模型的意义
模型的存在并不是为了从缓冲区放入和拿去,而是在放入数据前就并发构建Task,获取数据后多线程可以并发处理task,因为这些操作没有加锁。信号量的存在可以不用在临界区内部做判断,就可以知道临界资源的使用情况。是否加锁要看对应的临界资源是否被整体使用,所以多生产多消费模型就要用锁来控制相互之间的关系。
本篇gitee
下一篇写线程池。
结束。