参考
链接:恋恋风辰官方博客
并发队列&线程安全栈
代码结构:
并发队列ThreadSafeQueue.h:
#pragma once
#include <mutex>
#include <queue>
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one(); //⇽-- - ①
}
void wait_and_pop(T& value) //⇽-- - ②
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() // ⇽-- - ③
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // ⇽-- - ④
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>(); //⇽-- - ⑤
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
template<typename T>
class threadsafe_queue_ptr
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue_ptr()
{}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = std::move(*data_queue.front()); //⇽-- - ①
data_queue.pop();
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(*data_queue.front()); // ⇽-- - ②
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
std::shared_ptr<T> res = data_queue.front(); // ⇽-- - ③
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res = data_queue.front(); // ⇽-- - ④
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(
std::make_shared<T>(std::move(new_value))); // ⇽-- - ⑤
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
template<typename T>
class threadsafe_queue_ht
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock, [&] {return head.get() != get_tail(); });
return std::move(head_lock);
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
value = std::move(*head->data);
return pop_head();
}
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
value = std::move(*head->data);
return pop_head();
}
public:
threadsafe_queue_ht() : // ⇽-- - 1
head(new node), tail(head.get()){}
threadsafe_queue_ht(const threadsafe_queue_ht& other) = delete;
threadsafe_queue_ht& operator=(const threadsafe_queue_ht& other) = delete;
std::shared_ptr<T> wait_and_pop() // <------3
{
std::unique_ptr<node> const old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(T& value) // <------4
{
std::unique_ptr<node> const old_head = wait_pop_head(value);
}
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node> const old_head = try_pop_head(value);
if (old_head) {
return true;
}
return false;
}
bool empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head.get() == get_tail());
}
void push(T new_value) //<------2
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
}
};
线程安全栈ThreadSafeStack.h :
#pragma once
#include <exception>
#include <mutex>
#include <stack>
#include <condition_variable>
struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // ⇽-- - ①
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack(); // ⇽-- - ②
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top()))); // ⇽-- - ③
data.pop(); // ⇽-- - ④
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = std::move(data.top()); // ⇽-- - ⑤
data.pop(); // ⇽-- - ⑥
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
template<typename T>
class threadsafe_stack_waitable
{
private:
std::stack<T> data;
mutable std::mutex m;
std::condition_variable cv;
public:
threadsafe_stack_waitable() {}
threadsafe_stack_waitable(const threadsafe_stack_waitable& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack_waitable& operator=(const threadsafe_stack_waitable&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // ⇽-- - ①
cv.notify_one();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]() // ⇽-- - ②
{
if (data.empty())
{
return false;
}
return true;
});
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top()))); // ⇽-- - ③
data.pop(); // ⇽-- - ④
return res;
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]()
{
if (data.empty())
{
return false;
}
return true;
});
value = std::move(data.top()); // ⇽-- - ⑤
data.pop(); // ⇽-- - ⑥
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
{
return false;
}
value = std::move(data.top());
data.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}
};
实现一个测试类TestClass.h :
#pragma once
#include<iostream>
class TestClass
{
public:
TestClass(int data) :_data(data) {}
TestClass(const TestClass& mc) :_data(mc._data) {}
TestClass(TestClass&& mc) :_data(mc._data){}
friend std::ostream& operator << (std::ostream& os, const TestClass& mc)
{
os << mc._data;
return os;
}
private:
int _data;
};
主函数负责测试代码:
#include <iostream>
#include <thread>
#include <mutex>
#include "ThreadSafeQueue.h"
#include "ThreadSafeStack.h"
#include "TestClass.h"
std::mutex mtx_cout;
void PrintTestClass(std::string consumer, std::shared_ptr<TestClass> data)
{
std::lock_guard<std::mutex> lock(mtx_cout);
std::cout << consumer << " pop data success , data is " << (*data) << std::endl;
}
void TestThreadSafeStack()
{
threadsafe_stack_waitable<TestClass> stack;
std::thread consumer1(
[&]()
{
for (;;)
{
std::shared_ptr<TestClass> data = stack.wait_and_pop();
PrintTestClass("consumer1", data);
}
}
);
std::thread consumer2([&]()
{
for (;;)
{
std::shared_ptr<TestClass> data = stack.wait_and_pop();
PrintTestClass("consumer2", data);
}
});
std::thread producer([&]()
{
for (int i = 0; i < 100; i++)
{
TestClass mc(i);
stack.push(std::move(mc));
}
});
consumer1.join();
consumer2.join();
producer.join();
}
void TestThreadSafeQue()
{
threadsafe_queue_ptr<TestClass> safe_que;
std::thread consumer1(
[&]()
{
for (;;)
{
std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
PrintTestClass("consumer1", data);
}
}
);
std::thread consumer2([&]()
{
for (;;)
{
std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
PrintTestClass("consumer2", data);
}
});
std::thread producer([&]()
{
for (int i = 0; i < 100; i++)
{
TestClass mc(i);
safe_que.push(std::move(mc));
}
});
consumer1.join();
consumer2.join();
producer.join();
}
void TestThreadSafeQueHt()
{
threadsafe_queue_ht<TestClass> safe_que;
std::thread consumer1(
[&]()
{
for (;;)
{
std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
PrintTestClass("consumer1", data);
}
}
);
std::thread consumer2([&]()
{
for (;;)
{
std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
PrintTestClass("consumer2", data);
}
});
std::thread producer([&]()
{
for (int i = 0; i < 100; i++)
{
TestClass mc(i);
safe_que.push(std::move(mc));
}
});
consumer1.join();
consumer2.join();
producer.join();
}
int main()
{
/* 1. 测试线程安全栈 */
TestThreadSafeStack();
/* 2. 测试线程安全队列1 */
TestThreadSafeQue();
/* 3. 测试线程安全队列2 */
TestThreadSafeQueHt();
std::cout << "Finished! \n";
}
生产中直接应用即可。