Linux —— 生产者消费者模型
- 生产者消费者模型概述
- 生产者消费者模型特点
- 生产者消费者模型优点
- 基于BlockingQueue的生产者消费者模型
生产者消费者模型概述
生产者消费者模型是一种并发编程模型,用于解决多线程或多进程间的数据共享和同步问题。在这个模型中,有两种角色:生产者和消费者,它们通过共享的缓冲区进行通信。生产者负责生成数据并将其放入缓冲区,而消费者则从缓冲区中获取数据并进行处理。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而共享的缓冲区进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给缓冲区,消费者不找生产者要数据,而是直接从缓冲区取,平衡了生产者和消费者的处理能力。这个缓冲区就是用来给生产者和消费者解耦的。
生产者消费者模型特点
- 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据, 而消费者也不会在缓冲区空的时候,消耗数据。
- 当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入休眠状态,直到生产者往缓冲区添加数据时才会被唤醒。
- 生产者和消费者之间通过队列进行通信和数据传递,使得它们可以独立进行操作。
生产者消费者模型优点
- 解耦生产者和消费者:生产者和消费者之间通过队列进行通信和数据传递,使得它们可以独立进行操作。这种解耦 提高了代码的灵活性和可维护性,因为你可以更容易地修改或替换生产者和消费者的实现而无需影响其他部分。
- 提高系统的响应性和吞吐量:生产者和消费者可以并发地工作,生产者不必等待消费者完成处理才能继续生产,消费者也不必等待生产者生成新的数据才能继续消费。这可以提高系统的响应性和吞吐量,尤其是在处理大量数据时。
- 平衡生产和消费速度:生产者消费者模型可以帮助平衡生产和消费的速度。当生产者的速度快于消费者时,数据会积累在队列中,直到消费者可以处理它们。相反,当消费者的速度快于生产者时,队列中的数据会减少,直到有新的数据生成。
- 简化并发编程:生产者消费者模型提供了一种结构化的并发编程方式,通过使用队列来处理数据传递和同步,可以避免一些常见的并发编程错误,如竞态条件、死锁等。这使得并发编程更容易理解、调试和维护。
- 支持多个生产者和消费者:生产者消费者模型可以很容易地扩展以支持多个生产者和消费者。只需使用一个共享的队列来传递数据,多个生产者可以向队列中添加数据,多个消费者可以从队列中取出数据,而无需修改原有的逻辑。
基于BlockingQueue的生产者消费者模型
其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue
进行实现。
- blockingqueue.hpp头文件,定义并实现了了阻塞队列的基本功能
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
const static int defaultCap = 5;
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _max_cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap = defaultCap) : _max_cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
// 阻塞等待
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _block_queue.front();
_block_queue.pop();
pthread_mutex_unlock(&_mutex);
// 唤醒生产者
pthread_cond_signal(&_p_cond);
}
void Equeue(const T &in)
{
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 等待
pthread_cond_wait(&_p_cond, &_mutex);
}
_block_queue.push(in);
pthread_mutex_unlock(&_mutex);
// 唤醒消费者
pthread_cond_signal(&_c_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _block_queue; // 临界资源
int _max_cap; // 最大容量
pthread_mutex_t _mutex; // 锁
pthread_cond_t _p_cond; // 生产者条件变量
pthread_cond_t _c_cond; // 消费者条件变量
};
BlockQueue
类实现了一个线程安全的阻塞队列,用于生产者-消费者模型。
通过互斥量和条件变量来管理对共享队列的访问,确保在多线程环境中数据的一致性和安全性。
构造函数初始化最大容量和相关的同步机制,IsFull()
和IsEmpty()
方法用于检查队列状态。生产者通过Equeue()
方法向队列添加数据,若队列满则等待;消费者通过Pop()
方法从队列中取出数据,若队列空则等待。
此处分别使用同一把锁和两个条件变量来实现有效的同步,避免了资源的竞争和死锁。
生产者操作 (Enqueue)
- 生产者首先使用
pthread_mutex_lock()
获取互斥量_mutex
,保护对_block_queue
的访问。- 进入一个 while 循环,调用
IsFull()
检查队列是否已满。如果满了,生产者进入等待状态,调用pthread_cond_wait()
函数,传入_p_cond
和_mutex
参数。这个函数会自动释放_mutex
,并阻塞在_p_cond
条件变量上。- 当队列有空间时(不满),生产者使用
_block_queue
的push()
成员函数将数据in
推入队列。- 使用
pthread_mutex_unlock()
释放互斥量_mutex
,允许其他线程访问队列。- 调用
pthread_cond_signal()
函数,唤醒一个等待在_c_cond
条件变量上的消费者线程,通知它可以从队列中取数据了。
、
消费者操作 (Dequeue)
- 消费者使用
pthread_mutex_lock()
获取互斥量_mutex
。- 进入一个 while 循环,调用
IsEmpty()
检查队列是否为空。如果为空,消费者进入等待状态,调用pthread_cond_wait()
函数,传入_c_cond
和_mutex
参数。这个函数会自动释放_mutex,
并阻塞在_c_cond
条件变量上。- 当队列有数据时,消费者使用
_block_queue
的front()
成员函数获取队列头的数据,并存储在out
指针指向的位置。然后调用pop()
成员函数从队列中移除这个数据。- 使用
pthread_mutex_unlock()
释放互斥量_mutex
。- 调用
pthread_cond_signal()
函数,唤醒一个等待在_p_cond
条件变量上的生产者线程,通知它可以生产数据了。
- main.cpp文件,定义了生产者线程和消费者线程的执行逻辑
#include <iostream>
#include <unistd.h>
#include "BlockQueue.hpp"
#include <ctime>
void *Producer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int val = rand() % 10 + 1;
bq->Equeue(val);
std::cout << "Producer send a num -> " << val << std::endl;
// sleep(1);
}
return nullptr;
}
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);
int val = 0;
bq->Pop(&val);
std::cout << "Consumer get a num -> " << val << std::endl;
sleep(1);
}
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>();
srand(time(nullptr));
pthread_t t1, t2;
pthread_create(&t1, nullptr, Producer, bq);
pthread_create(&t2, nullptr, Consumer, bq);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
- 结果:
通过模板也可以将一个Task
类传入队列中,如下修改:
- Task.hpp 定义了Task类,用于传入队列:
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task(int x = 0, int y = 0) : _x(x), _y(y), _result(0) // 初始化 _result
{
}
~Task()
{
}
void Execute() // 更正拼写
{
_result = _x + _y; // 计算结果
}
void Debug()
{
std::cout << std::to_string(_x) + " + " + std::to_string(_y) + " = " + " ? " << std::endl;
}
void PrintResult() // 新增输出结果的方法
{
std::cout << _x << " + " << _y << " = " << _result << std::endl;
}
private:
int _x;
int _y;
int _result; // 结果变量
};
- main.cc
#include <iostream>
#include <unistd.h>
#include "BlockQueue.hpp"
#include <ctime>
#include "Task.hpp"
void *Producer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
sleep(1);
int x = rand() % 10 + 1;
usleep(10000);
int y = rand() % 10 + 1;
Task t(x,y);
bq->Equeue(t);
t.Debug();
}
return nullptr;
}
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
sleep(1);
Task t;
bq->Pop(&t);
t.Execute(); // 计算结果
t.PrintResult(); // 输出结果
}
}
int main()
{
BlockQueue<Task> *bq = new BlockQueue<Task>();
srand(time(nullptr));
pthread_t t1, t2;
pthread_create(&t1, nullptr, Producer, bq);
pthread_create(&t2, nullptr, Consumer, bq);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
- 结果: