数据结构——C++无锁队列
贺志国
2023.7.11
上一篇博客给出了最简单的C++数据结构——堆栈的几种无锁实现方法。队列的挑战与栈的有些不同,因为Push()
和Pop()
函数在队列中操作的不是同一个地方。因此同步的需求就不一样。需要保证对一端的修改是正确的,且对另一端是可见的。因此队列需要两个Node
指针:head_
和tail_
。这两个指针都是原子变量,从而可在加锁的情形下,可给多个线程同时访问。首先来分析单生产者/单消费者的情形。
一、单生产者/单消费者模型下的无锁队列
单生产者/单消费者模型就是指,在某一时刻,最多只存在一个线程调用Push()
函数,最多只存在一个线程调用Pop()
函数。该情形下的代码(文件命名为 lock_free_queue.h
)如下:
#pragma once
#include <atomic>
#include <memory>
template <typename T>
class LockFreeQueue {
public:
LockFreeQueue() : head_(new Node), tail_(head_.load()) {}
~LockFreeQueue() {
while (Node* old_head = head_.load()) {
head_.store(old_head->next);
delete old_head;
}
}
LockFreeQueue(const LockFreeQueue& other) = delete;
LockFreeQueue& operator=(const LockFreeQueue& other) = delete;
bool IsEmpty() const { return head_.load() == tail_.load(); }
void Push(const T& data) {
auto new_data = std::make_shared<T>(data);
Node* p = new Node; // 3
Node* old_tail = tail_.load(); // 4
old_tail->data.swap(new_data); // 5
old_tail->next = p; // 6
tail_.store(p); // 7
}
std::shared_ptr<T> Pop() {
Node* old_head = PopHead();
if (old_head == nullptr) {
return std::shared_ptr<T>();
}
const std::shared_ptr<T> res(old_head->data); // 2
delete old_head;
return res;
}
private:
// If the struct definition of Node is placed in the private data member
// field where 'head_' is defined, the following compilation error will occur:
//
// error: 'Node' has not been declared ...
//
// It should be a bug of the compiler. The struct definition of Node is put in
// front of the private member function `DeleteNodes` to eliminate this error.
struct Node {
// std::make_shared does not throw an exception.
Node() : data(nullptr), next(nullptr) {}
std::shared_ptr<T> data;
Node* next;
};
private:
Node* PopHead() {
Node* old_head = head_.load();
if (old_head == tail_.load()) { // 1
return nullptr;
}
head_.store(old_head->next);
return old_head;
}
private:
std::atomic<Node*> head_;
std::atomic<Node*> tail_;
};
一眼望去,这个实现没什么毛病,当只有一个线程调用Push()
和Pop()
时,这种情况下队列一点毛病没有。Push()
和Pop()
之间的先行(happens-before )关系非常重要,直接关系到能否安全地获取到队列中的数据。对尾部节点tail_
的存储⑦(对应于上述代码片段中的注释// 7
,下同)同步(synchronizes with)于对tail_
的加载①,存储之前节点的data
指针⑤先行(happens-before )于存储tail_
。并且,加载tail_
先行于加载data
指针②,所以对data
的存储要先行于加载,一切都没问题。因此,这是一个完美的单生产者/单消费者(SPSC, single-producer, single-consume)队列。
问题在于当多线程对Push()
和Pop()
并发调用。先看一下Push()
:如果有两个线程并发调用Push()
,会新分配两个节点作为虚拟节点③,也会读取到相同的tail_
值④,因此也会同时修改同一个节点,同时设置data
和next
指针⑤⑥,存在明显的数据竞争!
PopHead()
函数也有类似的问题。当有两个线程并发的调用这个函数时,这两个线程就会读取到同一个head_
,并且会通过next
指针去修改旧值。两个线程都能索引到同一个节点——真是一场灾难!不仅要保证只有一个Pop()
线程可以访问给定项,还要保证其他线程在读取head_
时,可以安全的访问节点中的next
,这就是和无锁栈中Pop()
一样的问题了。
Pop()
的问题解决了,那么Push()
呢?问题在于为了获取Push()
和Pop()
间的先行关系,就需要在为虚拟节点设置数据项前,更新tail_
指针。并发访问Push()
时,因为每个线程所读取到的是同一个tail_
,所以线程会进行竞争。
说明:
先行(happens-before )与同步(synchronizes with)是原子变量在线程间同步的两个重要关系。
Happens-before(先行)
Regardless of threads, evaluation A happens-before evaluation B if any of the following is true: 1) A is sequenced-before B; 2) A inter-thread happens before B. The implementation is required to ensure that the happens-before relation is acyclic, by introducing additional synchronization if necessary (it can only be necessary if a consume operation is involved). If one evaluation modifies a memory location, and the other reads or modifies the same memory location, and if at least one of the evaluations is not an atomic operation, the behavior of the program is undefined (the program has a data race) unless there exists a happens-before relationship between these two evaluations.
(无关乎线程,若下列任一为真,则求值 A 先发生于求值 B :1) A 先序于 B;2) A 线程间先发生于 B。要求实现确保先发生于关系是非循环的,若有必要则引入额外的同步(若引入消费操作,它才可能为必要)。若一次求值修改一个内存位置,而其他求值读或修改同一内存位置,且至少一个求值不是原子操作,则程序的行为未定义(程序有数据竞争),除非这两个求值之间存在先发生于关系。)
Synchronizes with(同步)
If an atomic store in thread A is a release operation, an atomic load in thread B from the same variable is an acquire operation, and the load in thread B reads a value written by the store in thread A, then the store in thread A synchronizes-with the load in thread B. Also, some library calls may be defined to synchronize-with other library calls on other threads.
(如果在线程A上的一个原子存储是释放操作,在线程B上的对相同变量的一个原子加载是获得操作,且线程B上的加载读取由线程A上的存储写入的值,则线程A上的存储同步于线程B上的加载。此外,某些库调用也可能定义为同步于其它线程上的其它库调用。)
二、多生产者-多消费者无锁队列
下述代码存在bug,需要进一步调试(文件命名为 lock_free_queue.h
):
#pragma once
#include <atomic>
#include <memory>
template <typename T>
class LockFreeQueue {
public:
LockFreeQueue() : head_(CountedNodePtr(new Node)), tail_(head_.load()) {}
~LockFreeQueue() {
while (Pop()) {
// Do nothing
}
}
LockFreeQueue(const LockFreeQueue& other) = delete;
LockFreeQueue& operator=(const LockFreeQueue& other) = delete;
bool IsEmpty() const { return head_.load().ptr == tail_.load().ptr; }
void Push(const T& data) {
auto new_data = std::make_unique<T>(data);
CountedNodePtr new_next(new Node);
new_next.external_count = 1;
CountedNodePtr old_tail = tail_.load();
while (true) {
IncreaseExternalCount(&tail_, &old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data,
new_data.get())) {
CountedNodePtr old_next = old_tail.ptr->next.load();
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
delete new_next.ptr;
new_next = old_next;
}
SetNewTail(new_next, &old_tail);
// Release the ownership of the managed object so that the data will not
// be deleted beyond the scope the unique_ptr<T>.
new_data.release();
break;
} else {
CountedNodePtr old_next = old_tail.ptr->next.load();
if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
old_next = new_next;
new_next.ptr = new Node;
}
SetNewTail(old_next, &old_tail);
}
}
}
std::unique_ptr<T> Pop() {
CountedNodePtr old_head = head_.load();
while (true) {
IncreaseExternalCount(&head_, &old_head);
Node* ptr = old_head.ptr;
if (ptr == tail_.load().ptr) {
ptr->ReleaseRef();
return std::unique_ptr<T>();
}
CountedNodePtr next = ptr->next.load();
if (head_.compare_exchange_strong(old_head, next)) {
T* res = ptr->data.exchange(nullptr);
FreeExternalCounter(&old_head);
return std::unique_ptr<T>(res);
}
ptr->ReleaseRef();
}
}
private:
// Forward class declaration
struct Node;
struct CountedNodePtr {
CountedNodePtr() : ptr(nullptr), external_count(0) {}
explicit CountedNodePtr(Node* input_ptr)
: ptr(input_ptr), external_count(0) {}
Node* ptr;
int external_count;
};
struct NodeCounter {
NodeCounter() : internal_count(0), external_counters(0) {}
// external_counters occupies only 2bits, where the maximum value stored
// is 3. Note that you need only 2 bits for the external_counters because
// there are at most two such counters. By using a bit field for this and
// specifying internal_count as a 30-bit value, you keep the total counter
// size to 32 bits. This gives you plenty of scope for large internal count
// values while ensuring that the whole structure fits inside a machine word
// on 32-bit and 64-bit machines. It’s important to update these counts
// together as a single entity in order to avoid race conditions. Keeping
// the structure within a machine word makes it more likely that the atomic
// operations can be lock-free on many platforms.
unsigned internal_count : 30;
unsigned external_counters : 2;
};
struct Node {
Node() : data(nullptr), counter(NodeCounter()), next(CountedNodePtr()) {
NodeCounter new_counter;
new_counter.internal_count = 0;
// There are only two counters in Node (counter and next), so the initial
// value of external_counters is 2.
new_counter.external_counters = 2;
counter.store(new_counter);
}
void ReleaseRef() {
NodeCounter old_node = counter.load();
NodeCounter new_counter;
do {
new_counter = old_node;
--new_counter.internal_count;
} while (!counter.compare_exchange_strong(old_node, new_counter));
if (!new_counter.internal_count && !new_counter.external_counters) {
delete this;
}
}
std::atomic<T*> data;
std::atomic<NodeCounter> counter;
std::atomic<CountedNodePtr> next;
};
private:
static void IncreaseExternalCount(std::atomic<CountedNodePtr>* atomic_node,
CountedNodePtr* old_node) {
CountedNodePtr new_node;
// If `*old_node` is equal to `*atomic_node`, it means that no other thread
// changes the `*atomic_node`, update `*atomic_node` to `new_node`. In fact
// the `*atomic_node` is still the original node, only the `external_count`
// of it is increased by 1. If `*old_node` is not equal to `*atomic_node`,
// it means that another thread has changed `*atomic_node`, update
// `*old_node` to `*atomic_node`, and keep looping until there are no
// threads changing `*atomic_node`.
do {
new_node = *old_node;
++new_node.external_count;
} while (!atomic_node->compare_exchange_strong(*old_node, new_node));
old_node->external_count = new_node.external_count;
}
static void FreeExternalCounter(CountedNodePtr* old_node) {
Node* ptr = old_node->ptr;
const int increased_count = old_node->external_count - 2;
NodeCounter old_counter = ptr->counter.load();
NodeCounter new_counter;
do {
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += increased_count;
} while (!ptr->counter.compare_exchange_strong(old_counter, new_counter));
if (!new_counter.internal_count && !new_counter.external_counters) {
delete ptr;
}
}
void SetNewTail(const CountedNodePtr& new_tail, CountedNodePtr* old_tail) {
Node* current_tail_ptr = old_tail->ptr;
while (!tail_.compare_exchange_weak(*old_tail, new_tail) &&
old_tail->ptr == current_tail_ptr) {
// Do nothing
}
if (old_tail->ptr == current_tail_ptr) {
FreeExternalCounter(old_tail);
} else {
current_tail_ptr->ReleaseRef();
}
}
private:
std::atomic<CountedNodePtr> head_;
std::atomic<CountedNodePtr> tail_;
};
三、测试代码
下面给出测试无锁栈工作是否正常的简单测试代码(文件命名为:lock_free_queue.cpp
):
#include "lock_free_queue.h"
#include <iostream>
#include <thread>
#include <vector>
namespace {
constexpr size_t kElementNum = 10;
constexpr size_t kThreadNum = 200;
} // namespace
int main() {
LockFreeQueue<int> queue;
for (size_t i = 0; i < kElementNum; ++i) {
std::cout << "The data " << i << " is pushed in the queue.\n";
queue.Push(i);
}
std::cout << "queue.IsEmpty() == " << std::boolalpha << queue.IsEmpty()
<< std::endl;
while (auto data = queue.Pop()) {
std::cout << "Current data is : " << *data << '\n';
}
std::vector<std::thread> producers1;
std::vector<std::thread> producers2;
std::vector<std::thread> consumers1;
std::vector<std::thread> consumers2;
for (size_t i = 0; i < kThreadNum; ++i) {
producers1.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 10);
producers2.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 20);
consumers1.emplace_back(&LockFreeQueue<int>::Pop, &queue);
consumers2.emplace_back(&LockFreeQueue<int>::Pop, &queue);
}
for (size_t i = 0; i < kThreadNum; ++i) {
producers1[i].join();
consumers1[i].join();
producers2[i].join();
consumers2[i].join();
}
return 0;
}
CMake的编译配置文件CMakeLists.txt
:
cmake_minimum_required(VERSION 3.0.0)
project(lock_free_queue VERSION 0.1.0)
set(CMAKE_CXX_STANDARD 17)
# If the debug option is not given, the program will not have debugging information.
SET(CMAKE_BUILD_TYPE "Debug")
add_executable(${PROJECT_NAME} ${PROJECT_NAME}.cpp)
find_package(Threads REQUIRED)
# libatomic should be linked to the program.
# Otherwise, the following link errors occured:
# /usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'
# /usr/include/c++/9/atomic:292: undefined reference to `__atomic_compare_exchange_16'
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT} atomic)
include(CTest)
enable_testing()
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
include(CPack)
上述配置中添加了对原子库atomic
的链接。因为引用计数的结构体CountedNodePtr
包含两个数据成员:int external_count; Node* ptr;
,这两个变量占用16字节,而16字节的数据结构需要额外链接原子库atomic
,否则会出现链接错误:
/usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'
/usr/include/c++/9/atomic:292: undefined reference to `__atomic_compare_exchange_16'
VSCode调试启动配置文件.vscode/launch.json
:
{
"version": "0.2.0",
"configurations": [
{
"name": "cpp_gdb_launch",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/${workspaceFolderBasename}",
"args": [],
"stopAtEntry": false,
"cwd": "${fileDirname}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"setupCommands": [
{
"description": "Enable neat printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
],
// "preLaunchTask": "cpp_build_task",
"miDebuggerPath": "/usr/bin/gdb"
}
]
}
使用CMake的编译命令:
cd lock_free_queue
# 只执行一次
mkdir build
cd build
cmake .. && make
运行结果如下:
./lock_free_queue
The data 0 is pushed in the queue.
The data 1 is pushed in the queue.
The data 2 is pushed in the queue.
The data 3 is pushed in the queue.
The data 4 is pushed in the queue.
The data 5 is pushed in the queue.
The data 6 is pushed in the queue.
The data 7 is pushed in the queue.
The data 8 is pushed in the queue.
The data 9 is pushed in the queue.
queue.IsEmpty() == false
Current data is : 0
Current data is : 1
Current data is : 2
Current data is : 3
Current data is : 4
Current data is : 5
Current data is : 6
Current data is : 7
Current data is : 8
Current data is : 9
VSCode调试界面如下: