思路:
如图,多个生产者线程和多个消费者线程共享同一固定大小的缓冲区,它们的生产和消费符合以下规则:
- 生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据
- 当缓冲区满的时候,生产者会进入阻塞状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入阻塞状态,直到生产者往缓冲区中添加数据时才会被唤醒
所以重点为实现一个线程安全的缓冲队列:
1、首先使用STL库中的deque作为基本容器;
2、然后定义一个固定大小的容量capacity,防止无限制扩容下去导致内存耗尽;
3、之后需要两个条件变量Productor和Consumer,模拟生产者和消费者;
4、条件变量必须得有互斥量辅助,所以还需一个互斥量mutex。
template<class T>
class BlockQueue {
public:
explicit BlockQueue(size_t maxCapacity = 1000);
~BlockQueue();
private:
std::deque<T> deq;
size_t capacity;
std::mutex mtx;
std::condition_variable Consumer;
std::condition_variable Producer;
};
主要成员函数介绍:
1、析构函数:清理队列中的所有成员,唤醒所有阻塞中的生产者、消费者线程。
template<class T>
BlockQueue<T>::~BlockQueue()
{
{
std::lock_guard<std::mutex> locker(mtx);
deq.clear();
}
Producer.notify_all();
Consumer.notify_all();
};
2、生产者线程应该调用的函数
(1)void BlockQueue<T>::push_back(const T& item)
template<class T>
void BlockQueue<T>::push_back(const T& item)
{
//使用条件变量前应先对互斥量上锁,此时选择unique_lock而非lock_guard
//因为unique_lock在上锁后允许临时解锁再加锁,而lock_guard上锁后只能在离开作用域时解锁
std::unique_lock<std::mutex> lock(mtx);
while (deq.size() >= capacity)
{
//若队列已满,则生产者线程进入阻塞状态,自动对互斥量解锁,被唤醒后又自动对互斥量加锁
Producer.wait(lock);
}
deq.push_back(item);//向队列中放入数据
Consumer.notify_one();//唤醒一个阻塞的消费者线程
}
(2)void BlockQueue<T>::push_front(const T& item)
template<class T>
void BlockQueue<T>::push_front(const T& item)
{
std::unique_lock<std::mutex> lock(mtx);
while (deq.size() >= capacity)
{
Producer.wait(lock);
}
deq.push_front(item);
Consumer.notify_one();
}
3、消费者线程应该调用的函数
(1)bool BlockQueue<T>::pop(T& item)
因为要实现线程安全的函数,所以将pop函数的接口设计为需要放入一个自己的T item作为删除数据的拷贝。
template<class T>
bool BlockQueue<T>::pop(T& item)
{
std::unique_lock<std::mutex> lock(mtx);
while (deq.empty())
{
//若队列空,则消费者线程进入阻塞,等待被唤醒
Consumer.wait(lock);
}
item = deq.front();//当作删除数据的拷贝
deq.pop_front();
Producer.notify_one();//唤醒一个阻塞的生产者线程
return true;
}
(2)bool BlockQueue<T>::pop(T& item,int timeout)
为pop()函数添加第二参数timeout,设计为带计时器的pop函数。
若队列在超时时间后仍一直为空,则立刻返回false。
template<class T>
bool BlockQueue<T>::pop(T& item,int timeout)
{
std::unique_lock<std::mutex> lock(mtx);
while (deq.empty())
{
//阻塞时间超过指定的timeout,直接返回false
if (Consumer.wait_for(lock, std::chrono::seconds(timeout)) == std::cv_status::timeout)
return false;
}
item = deq.front();
deq.pop_front();
Producer.notify_one();
return true;
}
4、一些额外的成员函数
(1)判断队列为空
template<class T>
bool BlockQueue<T>::empty()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.empty();
}
(2)判断队列为满
template<class T>
bool BlockQueue<T>::full()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.size()>=capacity;
}
(3)模拟队列的front函数
template<class T>
T BlockQueue<T>::front()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.front();
}
(4)模拟队列的back函数
template<class T>
T BlockQueue<T>::back()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.back();
}