文章目录
- 生产者消费者模型
- 基本概念
- 生产者消费者模型优点
- 生产者消费者模型的特点
- 基于阻塞队列的生产者消费者模型
- 阻塞队列-BlockingQueue
- C++模拟实现基于阻塞队列的生产消费模型
- BlockQueue.hpp基本框架
- 构造
- 析构
- 判空&&判满
- 从阻塞队列插入数据
- 向阻塞队列获取数据
- Cptest.cc
- 基于计算任务的生产者消费者模型
- Task.hpp
生产者消费者模型
基本概念
概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,
-
所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中
-
消费者也不用找生产者要数据,而是直接从这个容器里取数据
这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合
对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下
- 三种关系:
- 生产者和生产者(互斥关系)
- 消费者和消费者(互斥关系)
- 生产者和消费者(互斥关系 + 同步关系)
- 两种角色: 生产者和消费者(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区
我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护, 简称:321 原则
为什么上述的三种关系都存在互斥的关系
1)介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来
2)所有的生产者和消费者都会竞争申请锁,因此生产者和生产者 , 消费者和消费者, 生产者和消费者之间都存在互斥关系
为什么生产者和消费者之间为什么会存在同步关系
1)如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者就不能再生产数据
2)让消费者一直消费,那么当容器当中的数据被消费完后,消费者就不能再消费
虽然上述的操作这样不会造成数据不一致的问题, 但是这样会引起另一方的饥饿问题,是非常低效的
所以我们应该让生产者和消费者访问该容器时具有一定的顺序性 ,比如让生产者先生产一部分数据,然后再让消费者进行消费
注意: 互斥关系保证的是数据的正确性,同步关系是为了让多线程之间协同起来
基于阻塞队列的生产者消费者模型
阻塞队列-BlockingQueue
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构
其与普通的队列的区别在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素.
- 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出
(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
阻塞队列最典型的应用场景实际上就是管道的实现
C++模拟实现基于阻塞队列的生产消费模型
我们以单生产者、单消费者为例进行实现
其中:BlockQueue实际上就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现BlockQueue
Makefile
Cptest:Cptest.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f Cptest
BlockQueue.hpp基本框架
1)由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可
2)将BlockingQueue当中存储的数据进行模板化, 方便以后需要时进行复用
3)这里设置BlockingQueue存储数据的上限为default_cap
,当阻塞队列中存储了default_cap
组数据时生产者就不能进行生产了,此时生产者就应该被阻塞
4)阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁用于保护阻塞队列
5)这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满.当阻塞队列满了的时候,要进行生产的生产者线程就应该在full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在empty条件变量下进行等待
namespace Mango
{
const int default_cap = 5;
template<class T>
class BlockQueue
{
private:
std::queue<T> _bq;//阻塞队列->临界资源,需要通过加锁保护
int _cap;//阻塞队列最多可以容纳的数据个数
pthread_mutex_t _mtx;//互斥锁->保护临界资源的锁
//条件变量
//1. 当队列为满,就应该不要生产了(潜台词:不要竞争锁了),而应该让消费者来消费
//2. 当队列为空,就不应该消费(潜台词:不要竞争锁了),应该让生产者来进行生产
pthread_cond_t _full;//代表队列是满的,生产者在该条件变量下等待
pthread_cond_t _empty;//代表队列是空的,消费者在该条件变量下等待
};
}
构造
BlockQueue(int cap = default_cap) :_cap(cap)
{
pthread_mutex_init(&_mtx, nullptr);//初始化锁
//初始化条件变量
pthread_cond_init(&_empty, nullptr);
pthread_cond_init(&_full, nullptr);
}
析构
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);//释放锁
//释放条件变量
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
}
判空&&判满
bool isFull()
{
return _cap == _bq.size();
}
bool isEmpty()
{
return _bq.empty();
}
不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的
如果对应条件不满足,那么对应线程就会被挂起
但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait
函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁
从阻塞队列插入数据
1)生产者线程要向阻塞队列当中Push数据,前提是阻塞队列里面有空间
若阻塞队列已经满了,那么此时该生产者线程就需要在full条件下进行等待,直到消费者将其唤醒
2)当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据
而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后可以唤醒在empty条件变量下等待的消费者线程(也可以在特定条件下才唤醒消费者进程:例如:生产了一半的数据之后才唤醒)
//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
pthread_mutex_lock(&_mtx);//加锁
while (isFull())
{
//此时不能进行生产,直到阻塞队列可以容纳新的数据
//pthread_cond_wait
//功能:1. 调用的时候,会首先自动释放mtx_!,然后再挂起自己
//功能2. 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回
pthread_cond_wait(&_full, &_mtx);
}
_bq.push(data);
pthread_mutex_unlock(&_mtx);//解锁
pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
}
向阻塞队列获取数据
1)消费者线程要从阻塞队列当中Pop数据,前提是阻塞队列里面有数据
若阻塞队列为空,那么此时该消费者线程就需要在empty条件下进行等待,直到被生产者唤醒
2)当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间
而此时可能有生产者线程正在full条件变量下进行等待,因此当消费者消费完数据后可以唤醒在full条件变量下等待的生产者线程
//从阻塞队列获取数据(消费者调用)
void Pop(T* data)//输出型参数
{
pthread_mutex_lock(&_mtx);
while (isEmpty())
{
//不能进行消费,直到阻塞队列有新的数据
//pthread_cond_wait
//功能:1. 调用的时候,会首先自动释放mtx_!,然后再挂起自己
//功能2. 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
pthread_cond_wait(&_empty, &_mtx);
}
*data = _bq.front();
_bq.pop();
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
}
注意:上述判断是否满足生产消费条件时不能用if,而应该用while
pthread_cond_wait
函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行- 在多消费者的情况下,当生产者生产了一个数据后如果使用
pthread_cond_broadcast
函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了 - 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
#include <unistd.h>
namespace Mango
{
const int default_cap = 5;
template<class T>
class BlockQueue
{
public:
bool isFull()
{
return _cap == _bq.size();
}
bool isEmpty()
{
return _bq.empty();
}
BlockQueue(int cap = default_cap) :_cap(cap)
{
pthread_mutex_init(&_mtx, nullptr);//初始化锁
//初始化条件变量
pthread_cond_init(&_empty, nullptr);
pthread_cond_init(&_full, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);//释放锁
//释放条件变量
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
}
//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
pthread_mutex_lock(&_mtx);//加锁
while (isFull())
{
//此时不能进行生产,直到阻塞队列可以容纳新的数据
pthread_cond_wait(&_full, &_mtx);
}
_bq.push(data);
pthread_mutex_unlock(&_mtx);//解锁
pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
}
//从阻塞队列获取数据(消费者调用)
void Pop(T* data)//输出型参数
{
pthread_mutex_lock(&_mtx);
while (isEmpty())
{
//不能进行消费,直到阻塞队列有新的数据
pthread_cond_wait(&_empty, &_mtx);
}
*data = _bq.front();
_bq.pop();
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
}
private:
std::queue<T> _bq;//阻塞队列
int _cap;//阻塞队列最多可以容纳的数据个数
pthread_mutex_t _mtx;//互斥锁
//条件变量
//1. 当队列为满,就应该不要生产了(潜台词:不要竞争锁了),而应该让消费者来消费
//2. 当队列为空,就不应该消费(潜台词:不要竞争锁了),应该让生产者来进行生产
pthread_cond_t _full;//代表队列是满的,生产者在该条件变量下等待
pthread_cond_t _empty;//代表队列是空的,消费者在该条件变量下等待
};
}
Cptest.cc
在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据
#include "BlockQueue.hpp"
using namespace Mango;
void* consumer(void *args)
{
BlockQueue<int> *bq = (BlockQueue<int>*)args;
while(true)//不断的消费
{
sleep(2);//消费的慢
int data = 0;
bq->Pop(&data);//输出型参数
std::cout << "消费者消费了一个数据: " << data << std::endl;
}
}
void* producter(void *args)
{
BlockQueue<int> *bq = (BlockQueue<int>*)args;
while(true)//不断的生产
{
// sleep(2);//生产者生产的慢一点
//1. 制造数据,生产者的数据(task-任务)从哪里来??
int data = rand()%20 + 1;//1~20之间的数据
bq->Push(data);
std::cout << "生产者生产数据: " << data << std::endl;
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c,p;//生产者, 消费者线程
//两个线程就拿到同一个阻塞队列bq
pthread_create(&c, nullptr, consumer, (void*)bq);
pthread_create(&p, nullptr, producter, (void*)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
注意事项:阻塞队列要让生产者向队列中Push数据,让消费者从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入
生产者不停的进行生产,而消费者每隔2s进行消费
由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列打满了,此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了
生产者每隔2s进行生产,而消费者不停的进行消费
void* consumer(void *args)
{
BlockQueue<int> *bq = (BlockQueue<int>*)args;
while(true)//不断的消费
{
//sleep(2);//消费的慢
int data = 0;
bq->Pop(&data);//输出型参数
std::cout << "消费者消费了一个数据: " << data << std::endl;
}
}
void* producter(void *args)
{
BlockQueue<int> *bq = (BlockQueue<int>*)args;
while(true)//不断的生产
{
sleep(2);//生产者生产的慢一点
//1. 制造数据,生产者的数据(task-任务)从哪里来??
int data = rand()%20 + 1;//1~20之间的数据
bq->Push(data);
std::cout << "生产者生产数据: " << data << std::endl;
}
}
虽然消费者消费的很快,但一开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的
除此之外,我们可以当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产
此时让生产者生产的快,消费者消费的慢观察现象
//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
pthread_mutex_lock(&_mtx);//加锁
while (isFull())
{
//此时不能进行生产,直到阻塞队列可以容纳新的数据
pthread_cond_wait(&_full, &_mtx);
}
_bq.push(data);
if(_bq.size()>_cap/2)
{
pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
}
pthread_mutex_unlock(&_mtx);//解锁
}
//从阻塞队列获取数据(消费者调用)
void Pop(T* data)//输出型参数
{
pthread_mutex_lock(&_mtx);
while (isEmpty())
{
//不能进行消费,直到阻塞队列有新的数据
pthread_cond_wait(&_empty, &_mtx);
}
*data = _bq.front();
_bq.pop();
if(_bq.size()< _cap/2)
{
pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
}
pthread_mutex_unlock(&_mtx);
}
生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产
注意:上述即可以先唤醒再解锁,也可以先解锁再唤醒,都可以
case1:已经把锁释放了,然后唤醒你,你就立马可以竞争锁
case2:先唤醒的时候,有可能我把锁释放掉了,有可能没释放,没释放也不影响,因为被唤醒也需要重新申请锁
基于计算任务的生产者消费者模型
实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性,由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据
例如:
实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个Run成员函数,该函数代表着我们想让消费者如何处理拿到的数据
Task.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace Mango
{
class Task
{
public:
Task(int x = 0, int y = 0, int op = 0)
: _x(x), _y(y), _op(op)
{}
~Task()
{}
void Run()
{
int result = 0;
switch (_op)
{
case '+':
result = _x + _y;
break;
case '-':
result = _x - _y;
break;
case '*':
result = _x * _y;
break;
case '/':
if (_y == 0)
{
std::cout << "Warning: div zero!" << std::endl;
result = -1;
}
else
{
result = _x / _y;
}
break;
case '%':
if (_y == 0)
{
std::cout << "Warning: mod zero!" << std::endl;
result = -1;
}
else
{
result = _x % _y;
}
break;
default:
std::cout << "error operation!" << std::endl;
break;
}
std::cout << _x << _op << _y << "=" << result << std::endl;
}
private:
int _x;
int _y;
char _op;//执行+-*/%
};
}
此时生产者放入阻塞队列的数据就是一个Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理
单生产者,多消费者情形:
#include "BlockQueue.hpp"
#include"Task.hpp"
using namespace Mango;
void *consumer(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
while(true)
{
Task t;
bq->Pop(&t);//输出型参数->消费数据
t.Run();//处理参数
}
}
void *producter(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
std::string ops = "+-*/%";
while(true)
{
//1. 制造数据
int x = rand()%20+1; //[1,20]
int y = rand()%10+1; //[1,10]
char op = ops[rand()%5];
Task t(x, y, op);//任务,带参构造
std::cout << "生产者派发了一个任务: " << x << op << y << "=?" << std::endl;
//2. 将数据推送到任务队列中
bq->Push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c,p;
pthread_t c1,c2,c3,c4;
pthread_create(&c, nullptr, consumer, (void*)bq);
pthread_create(&c1, nullptr, consumer, (void*)bq);
pthread_create(&c2, nullptr, consumer, (void*)bq);
pthread_create(&c3, nullptr, consumer, (void*)bq);
pthread_create(&c4, nullptr, consumer, (void*)bq);
pthread_create(&p, nullptr, producter, (void*)bq);
pthread_join(c, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p, nullptr);
return 0;
}
此后我们想让生产者消费者模型处理某一种任务时,就只需要提供对应的Task类,然后让该Task类提供一个对应的Run成员函数告诉我们应该如何处理这个任务即可