文章目录
- 生产者消费者模型介绍
- 生产者消费者模型的特点
- 基于BlockingQueue的生产者消费者模型
生产者消费者模型介绍
生产者消费模型是一种常见的多线程编程模式,广泛应用于解决并发编程中的数据共享和任务调度问题。在该模型中,我们将生产数据并放入缓冲区的线程称为生产者线程,反之,从缓冲区中获取数据的线程就称为消费者线程。生产者和消费者通过共享的缓冲区进行通信,并且使用同步机制(如互斥锁和条件变量)来协调操作,防止数据竞争和资源浪费。
缓冲区是指一个用来存放数据的数据结构,通常是一个队列。 为了便于记忆,可以将生产者消费模型看成是由1个共享区(缓冲区),2个角色(生产者和消费者)和3种关系(生产者和生产者,消费者和生产者,消费者和消费者)组成。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型的特点
- 解耦:生产者和消费者不直接接触,彼此解耦,独立性强
- 效率高:通过缓冲区和同步机制,避免了忙等待
- 灵活性:可以通过调整缓冲区大小和线程数量来优化系统性能。
基于BlockingQueue的生产者消费者模型
BlockingQueue本质就是一个队列,常用来实现生产者消费者模型。在队列为空时获取元素的消费者线程就会被阻塞,直到队列中被生产者放入数据并唤醒阻塞的消费者线程。同样的,如果队列为满,那么此时不能再生产数据,生产者线程就会阻塞,直到消费者从队列中拿走数据并唤醒等待的生产者线程。
下面我们用代码模拟生产者消费者模型:
- blockingqueue.hpp头文件,定义并实现了了阻塞队列的基本功能
#pragma once
#include <iostream>
#include <functional>
#include <string>
#include <queue>
#include <pthread.h>
using namespace std;
const static int defaultcap = 5;
template <class T>
class BlockingQueue
{
private:
// 判断是否为空
bool IsEmpty()
{
return _block_queue.empty();
}
// 判断是否为满
bool IsFull()
{
return _block_queue.size() == _maxcap;
}
public:
// 构造
BlockingQueue(int cap = defaultcap)
: _maxcap(cap)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_p_cond, NULL);
pthread_cond_init(&_c_cond, NULL);
}
// 弹出元素,即消费者取出元素
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{ // 队列为空没有元素,线程进入等待
pthread_cond_wait(&_c_cond, &_mutex);
}
// 此时队列一定有元素
*out = _block_queue.front();
_block_queue.pop();
pthread_cond_signal(&_p_cond); // 唤醒一个生产者
pthread_mutex_unlock(&_mutex);
}
// 入队列,生产者生产元素
void Push(const T &val)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{ // 队列为满,生产者线程进入等待
pthread_cond_wait(&_p_cond, &_mutex);
}
// 此时队列一定不为满
_block_queue.push(val);
pthread_cond_signal(&_c_cond); // 唤醒一个消费者
pthread_mutex_unlock(&_mutex);
}
// 析构
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
queue<T> _block_queue; // 底层容器存储临界资源
int _maxcap; // 队列最大容量
pthread_mutex_t _mutex; // 互斥锁,实现同步
pthread_cond_t _p_cond; // 生产者条件变量
pthread_cond_t _c_cond; // 消费者条件变量
};
- main.cpp文件,定义了生产者线程和消费者线程的执行逻辑
#include <iostream>
#include "blockingqueue.hpp"
#include <unistd.h>
#include <ctime>
using namespace std;
void *Consumer(void *arg)
{
BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(arg); // 类型转换
while (true)
{
int val = 0;
bq->Pop(&val);
cout << "消费者取出数据->" << val << endl;
sleep(1);
}
}
void *Produce(void *arg)
{
BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(arg); // 类型转换
while (true)
{
int val = rand() % 100;
bq->Push(val);
cout << "生产者生产数据->" << val << endl;
sleep(1);
}
}
int main()
{
BlockingQueue<int> *bq = new BlockingQueue<int>();
srand(time(nullptr));
pthread_t t1, t2;
pthread_create(&t1, NULL, Consumer, bq);
pthread_create(&t2, NULL, Produce, bq);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
- 运行结果