目录
1 生活中的例子
2 为何要使用生产者消费者模型
3 生产者和消费者模型的特点
优点
4 如何理解生产消费模型提高了效率?
5 基于BlockingQueue(阻塞队列)的生产者消费者模型
C++ queue模拟阻塞队列的生产消费模型
1 生活中的例子
存在多个消费者,消费者对于商品的一次消费是小量的而且时间是不确定的,供货商一次生产的商品是大量的且时间是确定的(在工人的上班时间才能操作机械生产)。消费者没有能力也没有必要一次性购入多个相同的商品,供货商多次生产量小商品的成本比一次生产大量商品的成本高;消费者一般都在城区里生活,供货商的厂房一般都远离城区,所以消费者不方便到郊区购买产品。为了解决以上种种苦难,中间商——超市就出来了!在城区里面建立超市,对供货商,大批运入大量的产品;对消费者,可以提供多种多样的产品。生产者不需要等消费者消费的时候才生产产品,消费者消费的时候不用等生产者生产产品。通过超市,将生产者和消费者之间进行了解耦合,提高了效率,解决了忙闲不不均的问题!
2 为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
3 生产者和消费者模型的特点
首先,交易场所必须先被所有的线程看到(生产者和消费者线程)!注定了交易场所一定是一个会被多线程并发访问的公共区域,注定了多线程一定要保护共享资源的安全,注定了,在这种情况下,程序员一定要自己维护线程互斥与同步的关系!
问题:如何维护线程的互斥和同步的关系?--通过生产消费模型中,各个角色之间的关系!
优点
- 解耦
- 支持并发
- 支持忙闲不均
4 如何理解生产消费模型提高了效率?
首先明确的是,高效并不体现在从仓库中放入数据和提取数据中,因为这是串发的!
其次,我们要明确的是,生产者生产数据的过程可能漫长且独立的,消费者消费(处理)数据的时候可能漫长且独立的。
所以,提高的效率就体现在产生数据和处理数据上,因为可以多线程产出的数据,多线程处理数据,并发进行。生产数据和处理数据之间不需要相互等待,直接从仓库中存拿即可。
5 基于BlockingQueue(阻塞队列)的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞(利用管道进行进程间通信就是这个原理)
C++ queue模拟阻塞队列的生产消费模型
#include<iostream>
#include<queue>
#include<pthread.h>
const int N=5;
template<class T>
class BlockQueue
{
private:
void lock()
{
pthread_mutex_lock(&_mutex);
}
void unlock()
{
pthread_mutex_unlock(&_mutex);
}
bool isFull()
{
return _q.size()==_capacity;
}
bool isEmpty()
{
return _q.empty();
}
void pthreadWait(pthread_cond_t& cond)
{
pthread_cond_wait(&cond,&_mutex);
}
void pthreadWakeUp(pthread_cond_t& cond)
{
pthread_cond_signal(&cond);
}
public:
BlockQueue(const int num=N):_capacity(num)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_comsumer,nullptr);
pthread_cond_init(&_producer,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_comsumer);
pthread_cond_destroy(&_producer);
}
void push(const T& task)
{
lock();
while(isFull())//保证任何时候,要生产时仓库有空余
{
pthreadWait(_producer);
}
//向仓库运数据
_q.push(task);
//有了产品,唤醒消费者消费
pthreadWakeUp(_comsumer);
unlock();
}
void pop(T* out)
{
lock();
while(isEmpty())
{
pthreadWait(_comsumer);
}
//仓库有产品,开始消费
*out=_q.front();
_q.pop();
//消费完产品,仓库有空余,唤醒生产者生产
pthreadWakeUp(_producer);
unlock();
}
private:
std::queue<T> _q;//队列存储数据,被多线程看到,需要被保护
int _capacity;
pthread_mutex_t _mutex;//保护队列,一个锁,一个共享资源,就可以保证互斥关系的成立
//两个条件变量,保证消费者和生产者之间的同步关系
pthread_cond_t _comsumer;//控制消费速度
pthread_cond_t _producer;//控制生产速度
};
main.cc测试代码
#include<iostream>
#include"blockQueue.hpp"
#include"task.hpp"
using namespace std;
#include<ctime>
#include<pthread.h>
#include<unistd.h>
std::string ops="+-*/";
void* consumerRountine(void* args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while(true)
{
//1. 从队列中拿取数据
Task t;
bq->pop(&t);
//2. 处理数据,完成消费工作
t();
cout<<pthread_self()<<" | "<<t.formatArg()<<t.formatRes()<<endl;
}
return nullptr;
}
void* producerRountine(void* args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while(true)
{
//1. 生产数据
int x=rand()%10;
int y=rand()%10;
char op=ops[rand()%ops.size()];
Task t(x,y,op);
//2. 将数据推送到blockqueue的结构中,完成生产工作
bq->push(t);
cout<<pthread_self()<<" | "<<t.formatArg()<<"?"<<endl;
}
return nullptr;
}
int main()
{
srand((uint64_t)time(nullptr)^getpid());
pthread_t c[3],p[2];
BlockQueue<Task> bq;
pthread_create(&c[0],nullptr,consumerRountine,&bq);
pthread_create(&c[1],nullptr,consumerRountine,&bq);
pthread_create(&c[2],nullptr,consumerRountine,&bq);
pthread_create(&p[0],nullptr,producerRountine,&bq);
pthread_create(&p[1],nullptr,producerRountine,&bq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(c[2],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
return 0;
}
// void* consumerRountine(void* args)
// {
// BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
// while(true)
// {
// //1. 从队列中拿取数据
// int data=0;
// bq->pop(&data);
// //2. 处理数据,完成消费工作
// cout<<pthread_self()<<"| consumerRountine done, "<<"data: "<<data<<endl;
// }
// return nullptr;
// }
// void* producerRountine(void* args)
// {
// BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
// while(true)
// {
// sleep(1);
// //1. 生产数据
// int data=rand()%10;
// //2. 将数据推送到blockqueue的结构中,完成生产工作
// bq->push(data);
// cout<<pthread_self()<<"| push data,"<<"data:"<<data<<endl;
// }
// return nullptr;
// }
// int main()
// {
// srand((uint64_t)time(nullptr)^getpid());
// pthread_t c,p;
// BlockQueue<int> bq;
// pthread_create(&c,nullptr,consumerRountine,&bq);
// pthread_create(&p,nullptr,producerRountine,&bq);
// pthread_join(c,nullptr);
// pthread_join(p,nullptr);
// return 0;
// }