目录
- 1 概述
- 2 实现
- 3 测试
- 3 运行
1 概述
最近研究了C++11的并发编程的线程/互斥/锁/条件变量,利用互斥/锁/条件变量实现一个支持多线程并发的阻塞队列,队列大小没有限制。
阻塞队列是一个模板类,有两个模块参数,参数1是元素类型,参数2是容器类型,可以是std::deque和std::list,默认是std::deque。入队操作没有阻塞,出队操作如果队列为空则阻塞。
其类图为:
2 实现
实现代码如下:
#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H
#include <deque>
#include <mutex>
#include <condition_variable>
template<typename T, typename Sequence = std::deque<T>>
class block_queue
{
Sequence queue_;
std::mutex mutex_;
std::condition_variable cv_;
public:
typedef typename Sequence::value_type value_type;
typedef typename Sequence::size_type size_type;
typedef typename std::unique_lock<std::mutex> lock_type;
block_queue() = default;
block_queue(block_queue const&) = delete;
block_queue(block_queue&& ) = delete;
block_queue& operator = (block_queue const&) = delete;
block_queue& operator = (block_queue &&) = delete;
bool empty() const
{
lock_type lock(mutex_);
return queue_.empty();
}
size_type size() const
{
lock_type lock(mutex_);
return queue_.size();
}
void push(value_type const& value)
{
{
lock_type lock(mutex_);
queue_.push_back(value);
}
cv_.notify_one();
}
void push(value_type && value)
{
{
lock_type lock(mutex_);
queue_.push_back(std::move(value));
}
cv_.notify_one();
}
template<class... Args>
void emplace(Args&&... args)
{
{
lock_type lock(mutex_);
queue_.emplace_back(args...);
}
cv_.notify_one();
}
value_type pop()
{
lock_type lock(mutex_);
while(queue_.empty())
cv_.wait(lock);
value_type value = std::move(queue_.front());
queue_.pop_front();
return value;
}
};
#endif
说明:
- 三个入队接口:
- push(T const&) 左值入队
- push(T &&) 左值入队
- emplace() 构造参数入队
- 一个出队接口
- pop()
3 测试
基于cpptest的测试代码如下:
template<typename Sequence>
struct Function4BQ
{
block_queue<std::string, Sequence> queue;
std::mutex mutex;
int counter = 0;
void consume1(size_t n)
{
std::cerr << "\n";
for(size_t i = 0; i < n; ++i)
{
std::cerr << "I get a " << queue.pop() << std::endl;
counter++;
}
}
void consume2(size_t id)
{
std::string fruit = queue.pop();
{
std::unique_lock<std::mutex> lock(mutex);
std::cerr << "\nI get a " << fruit << " in thread(" << id << ")" << std::endl;
counter++;
}
}
void product1(std::vector<std::string> & fruits)
{
for(auto const& fruit: fruits)
queue.emplace(fruit + std::string(" pie"));
}
void product2(std::vector<std::string> & fruits)
{
for(auto const& fruit: fruits)
queue.push(fruit + std::string(" pie"));
}
void product3(std::vector<std::string> & fruits)
{
for(auto const& fruit: fruits)
queue.push(fruit);
}
};
typedef Function4BQ<std::deque<std::string>> Function4BqDeque;
typedef Function4BQ<std::list<std::string>> Function4BqList;
void BlockQueueSuite::one_to_one()
{
Function4BqDeque function;
std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};
std::thread threads[2];
threads[0] = std::thread(&Function4BqDeque::product1, std::ref(function), std::ref(fruits));
threads[1] = std::thread(&Function4BqDeque::consume1, std::ref(function), fruits.size());
for(auto &thread : threads)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
function.counter = 0;
threads[0] = std::thread(&Function4BqDeque::product2, std::ref(function), std::ref(fruits));
threads[1] = std::thread(&Function4BqDeque::consume1, std::ref(function), fruits.size());
for(auto &thread : threads)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
function.counter = 0;
threads[0] = std::thread(&Function4BqDeque::product3, std::ref(function), std::ref(fruits));
threads[1] = std::thread(&Function4BqDeque::consume1, std::ref(function), fruits.size());
for(auto &thread : threads)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}
void BlockQueueSuite::one_to_multi()
{
Function4BqList function;
std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};
std::thread product;
std::vector<std::thread> consumes(fruits.size());
for(size_t i = 0; i < consumes.size(); ++i)
consumes[i] = std::thread(&Function4BqList::consume2, std::ref(function), i);
product = std::thread(&Function4BqList::product1, std::ref(function), std::ref(fruits));
product.join();
for(auto &thread : consumes)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
function.counter = 0;
for(size_t i = 0; i < consumes.size(); ++i)
consumes[i] = std::thread(&Function4BqList::consume2, std::ref(function), i);
product = std::thread(&Function4BqList::product2, std::ref(function), std::ref(fruits));
product.join();
for(auto &thread : consumes)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
function.counter = 0;
for(size_t i = 0; i < consumes.size(); ++i)
consumes[i] = std::thread(&Function4BqList::consume2, std::ref(function), i);
product = std::thread(&Function4BqList::product3, std::ref(function), std::ref(fruits));
product.join();
for(auto &thread : consumes)
thread.join();
TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}
说明:
- 函数one_to_one测试一个生成者对应一个消费者(容器类型使用std::deque)。
- 函数one_to_multi测试一个生产者对应多个消费者(容器类型使用std::list)
3 运行
BlockQueueSuite: 0/2
I get a Apple pie
I get a Banana pie
I get a Pear pie
I get a Plum pie
I get a Pineapple pie
I get a Apple pie
I get a Banana pie
I get a Pear pie
I get a Plum pie
I get a Pineapple pie
I get a Apple
I get a Banana
I get a Pear
I get a Plum
I get a Pineapple
BlockQueueSuite: 1/2
I get a Apple pie in thread(0)
I get a Banana pie in thread(1)
I get a Pear pie in thread(2)
I get a Plum pie in thread(3)
I get a Pineapple pie in thread(4)
I get a Apple pie in thread(0)
I get a Banana pie in thread(1)
I get a Pear pie in thread(2)
I get a Plum pie in thread(3)
I get a Pineapple pie in thread(4)
I get a Apple in thread(0)
I get a Banana in thread(1)
I get a Pear in thread(2)
I get a Plum in thread(4)
I get a Pineapple in thread(3)
BlockQueueSuite: 2/2, 100% correct in 0.009150 seconds
Total: 2 tests, 100% correct in 0.009150 seconds
分析:
- 从结果看入队顺序和出队顺序是一致的。