目录
概念
代码
BlockQueue.hpp 代码:
伪唤醒!!
Thread.hpp 代码:
Task.hpp 代码:
test.cc 代码:
再次理解
概念
生产消费模型,也称为生产者-消费者问题,是计算机科学中的一个经典同步问题。它描述了两个或多个共享固定大小缓冲区的进程(或线程)——生产者和消费者之间的交互。生产者负责生成一定量的数据并将其放入缓冲区(生产),而消费者从缓冲区取出数据(消费)并进行处理。这个缓冲区就是用来给生产者和消费者解耦的。
这个模型的主要挑战在于确保生产者不会在缓冲区满时继续添加数据,同时确保消费者不会在缓冲区为空时尝试移除数据。这需要对访问共享资源(即缓冲区)进行适当的同步,以避免竞争条件。
该模型包含三种关系,分别是生产者与生产者之间的关系、消费者与消费者之间的关系、生产者与消费者之间的关系。
代码
BlockQueue.hpp 代码:
伪唤醒!!
伪唤醒(Spurious Wakeup)是指在一个多线程环境中,一个等待条件变量的线程在没有明确原因的情况下被唤醒。
伪唤醒的主要特征是:
- 无明显原因:线程被唤醒时,预期的条件并没有发生变化或者满足。
- 不可预测性:无法准确预测何时会发生伪唤醒,它们似乎是随机发生的。
- 平台依赖性:不同的操作系统或编程语言实现可能有不同的发生概率。
假设现在只有 1 个生产者,但是有 5 个消费者,现在的资源非常短缺,生产者一次生产的量不多,5 个消费者全在等待队列中等待,生产者生产了一个数据,生产者广播,把 5 个消费者全都唤醒了,5 个消费者全都去竞争互斥锁了,消费者 1 竞争成功,进行消费,消费完通知生产者生产,消费者 1 竞争锁成功时,由于代码的健壮性不足,剩下的消费者并没有回到条件变量下的等待队列去等待(因为大家都被唤醒了),而是在互斥锁下的等待队列中等待,等着锁被释放后去竞争锁,消费者 1 消费完后剩下的消费者开始竞争这把锁,但此时生产者还没有生产出数据,即阻塞队列为空,消费者 2 竞争锁成功了,只能继续向下走,根本没有判断缓冲区是否为空(因为我们写的是 if 判断),导致消费者 2 在空的阻塞队列中取数据!这是非法的!
简单来说,生产者唤醒了所有的消费者,但是只有部分消费者可以消费,剩下的消费者根本没有数据可以消费,但还是被唤醒了,这钟唤醒就是伪唤醒!
把 if 判断 改为 while 判断后,即使消费者 2 唤醒后,抢到了互斥锁,也需要判断阻塞队列是否为空,才可以去访问阻塞队列,才可以消费,如果阻塞队列为空,消费者 2 就去条件变量下的等待队列中等待,这样就不会出现伪唤醒的情况!
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>
template<class T>
class BlockQueue
{
private:
//判断阻塞队列(即缓冲区)是否为满
bool IsFull()
{
return _block_queue.size()==_cap;
}
//判断阻塞队列是否为空
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap)
:_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_consumer_cond,nullptr);
pthread_cond_init(&_productor_cond,nullptr);
}
//生产者放入数据
void Enqueue(T &in)
{
pthread_mutex_lock(&_mutex);
//阻塞队列满了,生产者线程等待
while(IsFull())
{
_productor_wait_num++;//等待的数量+1
pthread_cond_wait(&_productor_cond,&_mutex);
_productor_wait_num--;//等待的数量-1
}
//阻塞队列不是满的才可以放入数据
_block_queue.push(in);
//通知消费者线程,可以消费了
//有消费者线程在等待才唤醒
if(_consumer_wait_num>0)
pthread_cond_signal(&_consumer_cond);
pthread_mutex_unlock(&_mutex);
}
//消费者取出数据,out是输出型参数
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
//阻塞队列为空,消费者线程等待
while(IsEmpty())
{
_consumer_wait_num++;
pthread_cond_wait(&_consumer_cond,&_mutex);
_consumer_wait_num--;
}
//阻塞队列不为空,消费者可以取出数据
*out=_block_queue.front();
_block_queue.pop();
//如果生产者线程因为阻塞队列满了在等待,则唤醒生产者线程
if(_productor_wait_num>0)
pthread_cond_signal(&_productor_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumer_cond);
pthread_cond_destroy(&_productor_cond);
}
private:
std::queue<T> _block_queue;
int _cap;//上限
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _consumer_cond;//消费者的条件变量
pthread_cond_t _productor_cond;//生产者的条件变量
int _consumer_wait_num;//生产者在等待队列中的数量
int _productor_wait_num;//消费者在等待队列中的数量
};
#endif
Thread.hpp 代码:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T&)>;
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func, T &data, const std::string &name="none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T &_data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
}
#endif
Task.hpp 代码:
#pragma once
#include<iostream>
#include<string>
#include<functional>
using Task=std::function<void()>;
test.cc 代码:
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include<ctime>
using namespace ThreadModule;
using blockqueue_t = BlockQueue<Task>;
void PrintHello()
{
std::cout << "hello world" << std::endl;
}
void Productor(blockqueue_t &bq)
{
while(true)
{
sleep(1);
Task t=PrintHello;
//放入任务
bq.Enqueue(t);
}
}
void Consumer(blockqueue_t &bq)
{
while(true)
{
//从bq中取出任务
Task t;
bq.Pop(&t);
//处理任务
t();
}
}
void StartComm(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq,func_t<blockqueue_t> func)
{
for(int i=0;i<num;i++)
{
std::string name="thread-"+std::to_string(i+1);
threads->emplace_back(func,bq,name);
threads->back().Start();
}
}
void StartProductor(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
StartComm(threads,num,bq,Productor);
}
void StartConsumer(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{
StartComm(threads,num,bq,Consumer);
}
void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{
for(auto &thread:threads)
{
thread.Join();
}
}
int main()
{
// 需要执行的任务
blockqueue_t *bq = new blockqueue_t(5);
// 存储线程
std::vector<Thread<blockqueue_t>> threads;
StartProductor(&threads,3,*bq);
StartConsumer(&threads,1,*bq);
WaitAllThread(threads);
return 0;
}
再次理解
生产消费模型可以实现并发,但并不是在线程向阻塞队列中放入数据、取出数据这个过程体现的,这个过程是互斥的,不是并发的。
并发体现在当生产者线程放入队列时,消费者线程不一定都是在等待取出数据,不一定都在竞争锁,而可能在拿到数据之后执行各自的任务,即消费者线程可以并发地执行各自的任务。生产者线程也是同理,生产者线程不一定一直都在等待放入数据,也可能并发地在执行生产数据的任务。
系统先调度生产者线程还是消费者线程并不重要:
- 如果先调度了消费者线程,消费者判断阻塞队列为空,就会等待生产者放入数据,直到生产者线程唤醒消费者线程;
- 如果先调度了生产者线程,生产者判断阻塞队列不为满,就会向阻塞队列中放入数据。
也就是说,消费者线程的消费会按照生产者线程的步调来进行,消费者不能抢在生产者前面消费!