目录
生产者消费者模型的概念
生产者消费者模型的特点
生产者消费者模型优点
基于BlockingQueue的生产者消费者模型
基于 BlockingQueue 的生产者消费者模型的概念
模拟实现基于阻塞队列的生产消费模型
生产者消费者模型的概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。
生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?
- 介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。
- 所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
生产者和消费者之间为什么会存在同步关系?
- 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
- 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
- 虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。
注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。
生产者消费者模型优点
一、提高系统效率
- 并行处理:生产者和消费者可以同时运行,充分利用系统资源,实现并行处理。例如,在一个视频处理系统中,生产者可以不断地从摄像头获取视频帧并放入缓冲区,而消费者则可以同时从缓冲区中取出视频帧进行编码和存储,大大提高了处理效率。
- 资源分配优化:通过缓冲区的调节,可以使生产者和消费者的工作节奏更加协调,避免生产者过快地生成数据而消费者无法及时处理,或者消费者等待数据的时间过长。这样可以更好地分配系统资源,提高整体性能。
二、解耦生产者和消费者
- 独立性:生产者和消费者之间通过缓冲区进行交互,彼此之间的依赖关系降低。生产者不需要知道消费者的具体实现和处理逻辑,只需要将数据放入缓冲区即可。同样,消费者也不需要关心数据是如何生产的,只需要从缓冲区中获取数据进行处理。这种解耦使得系统更加灵活,易于维护和扩展。
- 可扩展性:当需要增加生产者或消费者的数量时,只需要对相应的部分进行修改,而不会影响到整个系统的结构。例如,在一个分布式系统中,可以轻松地添加新的生产者节点来提高数据生成的速度,或者增加消费者节点来加快数据处理的能力。
三、增强系统稳定性
- 缓冲作用:缓冲区可以起到缓冲数据的作用,避免生产者和消费者之间的直接交互可能带来的冲突和错误。例如,在网络通信中,如果生产者发送数据的速度过快,而消费者接收数据的速度较慢,可能会导致数据丢失或网络拥塞。通过使用缓冲区,可以将数据暂时存储起来,等待消费者有能力处理时再进行读取,从而提高系统的稳定性。
- 错误处理:如果生产者或消费者出现故障,缓冲区可以作为一个中间层,暂时存储数据,等待故障恢复后继续处理。这样可以减少因个别组件的故障而导致整个系统崩溃的风险。
四、适用于多种场景
- 多线程编程:在多线程环境中,生产者消费者模型是一种常见的设计模式。可以使用线程来实现生产者和消费者,通过共享的缓冲区进行数据交换。这种方式可以有效地利用多核处理器的性能,提高程序的执行效率。
- 分布式系统:在分布式系统中,生产者和消费者可以分布在不同的节点上,通过网络进行通信。缓冲区可以是一个分布式队列或数据库,用于存储和传递数据。这种架构可以实现大规模的数据处理和分布式计算,适用于云计算、大数据等领域。
- 实时系统:在实时系统中,生产者消费者模型可以用于处理实时数据。生产者可以不断地生成实时数据并放入缓冲区,消费者则可以按照一定的时间间隔从缓冲区中取出数据进行处理,确保系统能够及时响应外部事件。
如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。
对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。
基于BlockingQueue的生产者消费者模型
基于 BlockingQueue 的生产者消费者模型的概念
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
阻塞队列(Blocking Queue)其与普通的队列的区别在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
- 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。
注意:看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。
模拟实现基于阻塞队列的生产消费模型
为了方便理解,我们以单生产者、单消费者为例进行实现,其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现。
BlockQueue.hpp
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#define NUM 5
template<class T>
class BlockQueue
{
private:
// 判断队列是否满了
bool IsFull()
{
return _q.size() == _cap;
}
// 判空
bool IsEmpty()
{
return _q.empty();
}
public:
// 构造函数
BlockQueue(int cap = NUM)
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
// 析构函数
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
//不能进行生产,直到阻塞队列可以容纳新的数据
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
}
//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
//不能进行消费,直到阻塞队列有新的数据
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
}
private:
std::queue<T> _q; //阻塞队列
int _cap; //阻塞队列最大容器数据个数
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _full; // 条件变量
pthread_cond_t _empty; // 条件变量
};
BlockQueue.cpp
#include "BlockQueue.hpp"
void* Producer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//生产者不断进行生产
while (true){
sleep(1);
int data = rand() % 100 + 1;
bq->Push(data); //生产数据
std::cout << "Producer: " << data << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
//消费者不断进行消费
while (true){
sleep(1);
int data = 0;
bq->Pop(data); //消费数据
std::cout << "Consumer: " << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t producer, consumer;
BlockQueue<int>* bq = new BlockQueue<int>;
//创建生产者线程和消费者线程
pthread_create(&producer, nullptr, Producer, bq);
pthread_create(&consumer, nullptr, Consumer, bq);
//join生产者线程和消费者线程
pthread_join(producer, nullptr);
pthread_join(consumer, nullptr);
delete bq;
return 0;
}
- 我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可。
- 将BlockingQueue当中存储的数据模板化,方便以后需要时进行复用。
- 我们设置BlockingQueue存储数据的上限为5,当阻塞队列中存储了五组数据时生产者就不能进行生产了,此时生产者就应该被阻塞。
- 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
- 生产者线程要向阻塞队列当中Push数据,前提是阻塞队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中有空间时再将其唤醒。
//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
//不能进行生产,直到阻塞队列可以容纳新的数据
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
}
- 消费者线程要从阻塞队列当中Pop数据,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要进行等待,直到阻塞队列中有新的数据时再将其唤醒。
//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
//不能进行消费,直到阻塞队列有新的数据
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
}
- 因此在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在 full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在 empty条件变量下进行等待。
- 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。
- 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在 empty条件变量下等待的消费者线程。
- 当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在full条件变量下进行等待,因此当消费者消费完数据后需要唤醒在 full条件变量下等待的生产者线程。
- 判断是否满足生产消费条件时不能用if,而应该用while:
- pthread_cond_wait函数 是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
- 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。
生产者消费者步调一致
- 代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据
- 因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。
生产者生产的慢,消费者消费的快
我们可以让生产者每隔一秒进行生产,而消费者不停的进行消费
void* Producer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true)
{
sleep(1);
int data = rand() % 100 + 1;
bq->Push(data); //生产数据
std::cout << "Producer: " << data << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true)
{
int data = 0;
bq->Pop(data); //消费数据
std::cout << "Consumer: " << data << std::endl;
}
}
- 虽然消费者消费的很快,但一开始阻塞队列中是没有数据的
- 因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后
- 消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的。
生产者生产的快,消费者消费的慢
我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费
void* Producer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true)
{
int data = rand() % 100 + 1;
bq->Push(data); //生产数据
std::cout << "Producer: " << data << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true)
{
sleep(1);
int data = 0;
bq->Pop(data); //消费数据
std::cout << "Consumer: " << data << std::endl;
}
}
- 此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列打满了
- 此时生产者想要再进行生产就只能在 full条件变量下进行等待,直到消费者消费完一个数据后
- 生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待
满足某一条件时再唤醒对应的生产者或消费者
- 我们也可以当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费
- 当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。
//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
//不能进行生产,直到阻塞队列可以容纳新的数据
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
if (_q.size() >= _cap / 2)
{
pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
}
pthread_mutex_unlock(&_mutex);
}
//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
//不能进行消费,直到阻塞队列有新的数据
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
if (_q.size() <= _cap / 2)
{
pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
}
pthread_mutex_unlock(&_mutex);
}
- 我们仍然让生产者生产的快,消费者消费的慢
- 运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程
- 而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产
基于计算任务的生产者消费者模型
- 当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。
- 由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。
例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个Run成员函数,该函数代表着我们想让消费者如何处理拿到的数据。
#pragma once
#include <iostream>
class Task
{
public:
Task(int x = 0, int y = 0, int op = 0)
: _x(x), _y(y), _op(op)
{}
~Task()
{}
void Run()
{
int result = 0;
switch (_op)
{
case '+':
result = _x + _y;
break;
case '-':
result = _x - _y;
break;
case '*':
result = _x * _y;
break;
case '/':
if (_y == 0){
std::cout << "Warning: div zero!" << std::endl;
result = -1;
}
else{
result = _x / _y;
}
break;
case '%':
if (_y == 0){
std::cout << "Warning: mod zero!" << std::endl;
result = -1;
}
else{
result = _x % _y;
}
break;
default:
std::cout << "error operation!" << std::endl;
break;
}
std::cout << _x << _op << _y << "=" << result << std::endl;
}
private:
int _x;
int _y;
char _op;
};
此时生产者放入阻塞队列的数据就是一个Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。
void* Producer(void* arg)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
const char* arr = "+-*/%";
//生产者不断进行生产
while (true){
int x = rand() % 100;
int y = rand() % 100;
char op = arr[rand() % 5];
Task t(x, y, op);
bq->Push(t); //生产数据
std::cout << "producer task done" << std::endl;
}
}
void* Consumer(void* arg)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
//消费者不断进行消费
while (true){
sleep(1);
Task t;
bq->Pop(t); //消费数据
t.Run(); //处理数据
}
}
此后我们想让生产者消费者模型处理某一种任务时,就只需要提供对应的Task类,然后让该Task类提供一个对应的Run成员函数告诉我们应该如何处理这个任务即可。