Linux OS:基于阻塞队列的生产者消费者模型
- 前言
- 一、阻塞队列的大致框架
- 二、生产者向阻塞队列中生产数据
- 三、消费者获取阻塞队列中数据
- 四、总体生产和消费思路及测试代码
- 4.1 单生产单消费
- 4.2 多生产多消费
- 五、所以代码
前言
阻塞队列是一种常用于实现生产者消费者模型的数据结构。和普通队列不同的时,队列存在上限,当队列未满时,往队列中插入数据的操作会被阻塞;当队列为空时,从队列中获取数据的操作也会被阻塞!
- 在Linux OS:线程封装 | RAII封装锁 | 随机数运算任务封装中,对锁进行了封装,后续加锁解锁操作都基于此!
一、阻塞队列的大致框架
对于阻塞队列BlockQueue
我们首先需要一个普通队列作为生产消费场所!阻塞队列存在上限,我们也需要指明。
消费者和生产者之间都需要向队列进行操作(插入和获取数据),即访问临界区资源。生产者和消费者并发访问临界资源时会导致多执行流数据不一致问题。所以我们需要对队列进行加锁!
对于生产者来说,空间就是资源;对于消费者来说,队列中的数据才是资源。在阻塞队列中,只要队列没有到达上限,生产者就可以一直生产;只要队列中有数据,消费者就可以一直消费数据!但存在队列未满,为空情况。此时生产者和消费者会疯狂进行加锁和解锁,访问无意义的临界资源!所以为了防止这种情况出现,我们在生产者和消费者中添加条件变量!当队列为空时,消费者阻塞等待;当队列未满时,生产者阻塞等待!并且生产者生产数据后,直接通知消费者进行消费;消费者消费完后,立即通知生产者生产。
然后我们需要在构造函数中对锁和条件变量进行初始化,析构时进行回收!具体如下:
const int defaultSize = 5; // 阻塞队列默认大小
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultSize)
: _capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity; // 阻塞队列上限大小
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 生产者使用
pthread_cond_t _c_cond; // 消费者使用
};
二、生产者向阻塞队列中生产数据
生产者向临界区(队列资源)中插入数据时,先申请锁,然后判断队列中是否未满。如果队列未满,此时生产者在条件变量出进行等待,一旦消费者消费数据发送信号后就会被唤醒,继续向后执行!当生产者插入完数据后,通知消费者消费,即唤醒消费者的条件变量!最后就是是否锁资源了!
bool Full()
{
return _q.size() == _capacity;
}
void Push(T &in)
{
{
LockGuard lock(&_mutex);
// pthread_mutex_lock(&_mutex);
while (Full())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond); // 通知消费者消费
// pthread_mutex_unlock(&_mutex);
}
}
三、消费者获取阻塞队列中数据
消费者行为和生产者类似:申请锁,判断队列中是否存在数据(不存在,消费者在条件变量处阻塞等待),获取队列数据,唤醒生产者条件变量、释放锁!
bool Empty()
{
return _q.size() == 0;
}
void Pop(T *out)
{
{
LockGuard lock(&_mutex);
// pthread_mutex_lock(&_mutex);
while (Empty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond); // 通知生成者生产
// pthread_mutex_unlock(&_mutex);
}
}
四、总体生产和消费思路及测试代码
4.1 单生产单消费
在阻塞队列BlockQueue
中我们封装了向队列中插入数据和获取数据接口,并在文章开始封装了一个随机数运算任务封装
类。
下面我们在主线程中分布创建一个生产者和消费者线程,并通过参数方式让两个线程看到同一份公共资源 —— 阻塞队列。然后生产者每隔一秒生产一个任务放到阻塞队列中(两个随机数加随机运算符构成的任务);消费者直接获取阻塞队列中的数据,并进行处理!
【测试代码】:
const std::string opers = "+-*[/{%]0";
void *Productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
sleep(1);
int data_x = rand() % 10;
usleep(1000);
int data_y = rand() % 10;
char op = opers[rand() % opers.size()];
Task t(data_x, data_y, op); // 向阻塞队列中生产数据
std::cout << "Productor push task: " << t.PrintTask() << std::endl;
bq->Push(t);
}
}
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t;
bq->Pop(&t); // 获取数据
t(); // 对数据进行处理
std::cout << "Consumer do task done: " << t.PrintResult() << std::endl;
}
}
int main()
{
srand(time(nullptr) ^ pthread_self() ^ getpid());
pthread_t p, c;
BlockQueue<Task> *bq = new BlockQueue<Task>(); // 阻塞队列
pthread_create(&p, nullptr, Productor, (void *)bq); // 创建生产者线程
pthread_create(&c, nullptr, Consumer, (void *)bq); // 创建消费者线程
pthread_join(p, nullptr);
pthread_join(c, nullptr);
return 0;
}
【运行结果】:
4.2 多生产多消费
单生产单消费
和多生产多消费
一样,不同之处在于我们需要通过数组将生产者和消费者管理起来罢了!
【伪代码】:
int main()
{
srand(time(nullptr) ^ pthread_self() ^ getpid());
pthread_t p[3], c[2]; // 管理多生产者和多消费者
BlockQueue<Task> *bq = new BlockQueue<Task>(); // 阻塞队列
pthread_create(&p[0], nullptr, Productor, (void *)bq); // 创建生产者线程
pthread_create(&p[1], nullptr, Productor, (void *)bq); // 创建生产者线程
pthread_create(&p[2], nullptr, Productor, (void *)bq); // 创建生产者线程
pthread_create(&c[0], nullptr, Consumer, (void *)bq); // 创建消费者线程
pthread_create(&c[1], nullptr, Consumer, (void *)bq); // 创建消费者线程
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
return 0;
}
五、所以代码
gitee: Linux OS:基于阻塞队列的生产者消费者模型