学习一下别人写的,线程安全的队列代码。https://github.com/markparticle/WebServer/blob/master/code/log/blockqueue.hhttps://github.com/markparticle/WebServer/blob/master/code/log/blockqueue.h
/*
* @Author : mark
* @Date : 2020-06-16
* @copyleft Apache 2.0
*/
#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H
#include <mutex>
#include <deque>
#include <condition_variable>
#include <sys/time.h>
template<class T>
class BlockDeque {
public:
explicit BlockDeque(size_t MaxCapacity = 1000);
~BlockDeque();
void clear();
bool empty();
bool full();
void Close();
size_t size();
size_t capacity();
T front();
T back();
void push_back(const T &item);
void push_front(const T &item);
bool pop(T &item);
bool pop(T &item, int timeout);
void flush();
private:
std::deque<T> deq_;
size_t capacity_;
std::mutex mtx_;
bool isClose_;
std::condition_variable condConsumer_;
std::condition_variable condProducer_;
};
template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity) :capacity_(MaxCapacity) {
assert(MaxCapacity > 0);
isClose_ = false;
}
template<class T>
BlockDeque<T>::~BlockDeque() {
Close();
};
template<class T>
void BlockDeque<T>::Close() {
{
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();
isClose_ = true;
}
condProducer_.notify_all();
condConsumer_.notify_all();
};
template<class T>
void BlockDeque<T>::flush() {
condConsumer_.notify_one();
};
template<class T>
void BlockDeque<T>::clear() {
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();
}
template<class T>
T BlockDeque<T>::front() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.front();
}
template<class T>
T BlockDeque<T>::back() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.back();
}
template<class T>
size_t BlockDeque<T>::size() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size();
}
template<class T>
size_t BlockDeque<T>::capacity() {
std::lock_guard<std::mutex> locker(mtx_);
return capacity_;
}
template<class T>
void BlockDeque<T>::push_back(const T &item) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);
}
deq_.push_back(item);
condConsumer_.notify_one();
}
template<class T>
void BlockDeque<T>::push_front(const T &item) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);
}
deq_.push_front(item);
condConsumer_.notify_one();
}
template<class T>
bool BlockDeque<T>::empty() {
std::lock_guard<std::mutex> locker(mtx_);
return deq_.empty();
}
template<class T>
bool BlockDeque<T>::full(){
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size() >= capacity_;
}
template<class T>
bool BlockDeque<T>::pop(T &item) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty()){
condConsumer_.wait(locker);
if(isClose_){
return false;
}
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
}
template<class T>
bool BlockDeque<T>::pop(T &item, int timeout) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty()){
if(condConsumer_.wait_for(locker, std::chrono::seconds(timeout))
== std::cv_status::timeout){
return false;
}
if(isClose_){
return false;
}
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
}
#endif // BLOCKQUEUE_H
1,原理
利用c++ std::deque 队列,存储数据。
使用两个std::condition_variable 实现生产者和消费者。
使用std::mutex 锁,控制生产者和消费者对队列的控制权。
2, 构造函数
给isClose赋值为false,初始化队列的长度,也可以使用默认值。
3,析构函数
清空队列,设置isClose=false,并唤醒生产者和消费者。
唤醒后,如果此时 消费者或者生产者正在阻塞,则会继续进行,并且检测到了isClose_=false,则会正常退出。再使用信号量 wait 时,析构时要让阻塞去掉,然后函数正常返回。
4,加入元素
分为从前边加,和从后边加,逻辑时一样的。首先对 队列加锁,然后判断当前队列的长度是否大于设定的容量,如果超出则生产者信号量需要阻塞等待,直到消费者 消费了一个,然后继续向下执行添加逻辑。如果不大于设定的长度,则正常向队列添加一个元素,然后唤醒消费者 信号量,取消费。
5,取出元素
先给队列加锁保护,然后判断队列是否为空。如果为空,则说明队列没有元素,消费者等待。如果不为空,则正常取出,然后通知生产者继续生产。
这里还有一个函数,用来超时判断,如果队列为空,则等待一定秒数,如果这个时间过后,仍然为空,则直接返回false.
6,其它
clear() 清空队列
empty() 判断队列是否为空
full() 判断是否已满
front() 返回第一个,不会取出
back() 返回最后一个,不会取出
7,测试一下