文章目录
- 设计无锁的并发数据结构
- 定义及意义
- 无阻塞数据结构
- 无锁数据结构
- 无等待数据结构
- 无锁结构的利弊
- 无锁数据结构的例子
- 无锁线程安全栈
- 使用风险指针检测不可回收的节点
- 使用引用计数
- 无锁栈上的内存模型
- 实现一个无锁的线程安全队列
- 设计无锁数据结构的指导建议
设计无锁的并发数据结构
细粒度有锁并发结构可能会导致死锁问题,为了避免死锁问题。使用内存序来设计并发结构是另一种解决方案。
定义及意义
用mutex, condition variable, future去同步的数据结构与算法被称为阻塞数据结构与算法。程序调用库函数使线程阻塞(block),OS将线程挂起,直至被另一线程解除阻塞(unblock)。
当程序不使用这些库函数使线程阻塞和解除阻塞,称为无阻塞结构。但是,无阻塞结构并不是都是无锁的!!!
无阻塞数据结构
使用std::atomic_flag
制作自旋锁
class spinlock_mutex{
std::atomic_flag flag;
public:
spinlock_mutex():
flag(ATOMIC_FLAG_INIT){ }
void lock(){ while(flag.test_and_set(std::memory_order_acquire)); }
void unlock(){ flag.clear(std::memory_order_release); }
};
自旋锁没调用阻塞函数,但是并不是无锁结构,仍然是个锁。
无锁结构需要更具体的定义:
- 无阻碍(Obstruction-Free)——如果其他线程都暂停了,任何给定线程将在有限步骤内完成操作。
- 无锁(Lock-Free)——如果多个线程对一个数据结构进行操作,其中一个线程将在有限步骤内完成其操作。
- 无等待(Wait-Free)——即使有其他线程也在对该数据结构进行操作,每个线程都将在有限步骤内完成操作。
由于所有其他线程不是都暂停,通常无阻塞算法被视为失败的无锁结构。
无锁数据结构
无锁结构意味着多线程可以并发的访问数据结构做不同的操作,但不能做相同的操作。如果有线程被调度器挂起,其他线程无需等待仍然能够继续完成操作。
无锁数据结构的算法常在循环中使用**“比较/交换”操作。如果有其他线程完成了对数据的修改,那该线程需要在“比较/交换”操作之前重做部分操作。如果其他线程挂起而此线程的“比较/交换”操作成功执行,这段代码是无锁**的;反之,需要需要使用自旋锁,这段代码是“无阻塞-有锁”的。
无锁算法中的循环会让一些线程处于“饥饿”状态。如有线程在“错误”时间执行,那么第一个线程将会不停的尝试所要完成的操作(其他程序继续执行)。“无锁-无等待”数据结构的出现,就为了避免这种问题。
无等待数据结构
无等待数据结构是一个无锁数据结构,且每个线程的数据访问均在有限步骤内完成。
由于可能会和其他线程的行为冲突,算法会进行了若干次尝试,因此无法做到无等待。本章的大多数例子都有一种特性——对compare_exchange_weak或compare_exchange_strong操作进行循环,并且循环次数没有上限。操作系统对线程进行进行管理,有些线程的循环次数非常多,有些线程的循环次数就非常少。因此,这些操作是无等待的。
因此,真正实现无锁无等待结构十分困难,需要有足够的理由且收益>成本,再去实现。
无锁结构的利弊
使用无锁结构的主要原因:最大化并发;鲁棒性;没有任何锁(可能存在活锁)。另外,当不能限制访问数据结构的线程数量时,就需要注意不变量的状态,或选择替代品来保持不变量的状态。同时,还需要注意操作的顺序。为了避免未定义行为,及相关的数据竞争,必须使用原子操作对修改操作进行限制。不过,仅使用原子操作是不够的,需要确定其他线程看到的修改,是否遵循正确的顺序。
缺点:虽提高了并发访问的能力,减少了单个线程的等待时间,但是其可能会将整体性能拉低。原子操作的无锁代码要慢于无原子操作的代码,原子操作就相当于无锁数据结构中的锁。硬件必须通过同一个原子变量对线程间的数据进行同步。
提交代码之前,无论是基于锁的数据结构,还是无锁的数据结构,对性能的检查都很重要(最坏的等待时间,平均等待时间,整体执行时间或者其他指标)。
无锁数据结构的例子
无锁线程安全栈
这里使用链表作为栈的底层。
push
节点步骤:
- 创建新节点
node
。 - 让
node->next
指向head->next
。 head->next
指向node
。
问题:多线程时,步骤2和3存在条件竞争。
解决方案:步骤3使用循环+“比较/交换”操作替代。
pop
节点步骤:
- 获取
head
。 - 读取
head->next
指向的结点node
。 - 设置
head->next
指向node->next
。 - 通过
node
返回携带的数据data
。 - 删除
node
节点。
问题:
- 线程a和b同时运行步骤1;下一时刻线程a运行步骤2而线程b运行步骤5,导致线程b解引用悬空指针。
- 两个线程同时读取并返回同一个
head
值。 - 空链表时,
head->next
将引起未定义行为。 - 返回值的异常安全性。
解决方案:
- 由于
push()
在存入栈后就不会访问节点,因此只要关注pop()
的多线程访问问题。步骤5将在“待删除”链表(to_be_deleted)中添加要pop的元素,使用原子计数器记录pop()
的次数(入口+1,出口-1),计数器为1时可安全释放元素;反之,在链表尾部再添加节点。 - 操作2和3使用**“比较/交换”**操作更新
head
,失败时返回步骤1。 - 步骤1确定head不是nullptr,返回异常或bool类型。
- 返回数值使用智能指针。
template<typename T>
class lock_free_stack{
private:
struct node{
T data;
node* next;
node(T const& data_):data(std::make_shared<T>(data_)){}//智能指针分配
};
std::atomic<node*> head;
std::atomic<unsigned> threads_in_pop; // 调用pop线程计数器
std::atomic<node*> to_be_deleted; //待删除链表
void try_reclaim(node* old_head); //调用时计数器-1
static void delete_nodes(node* nodes);
void chain_pending_nodes(node* nodes);
void chain_pending_nodes(node* first,node* last);
public:
void push(T const& data){
node* const new_node=new node(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
}
std::shared_ptr<T> pop(){
++threads_in_pop; // 在做事之前,计数器+1
node* old_head=head.load();
while(old_head != nullptr && // 在解引用前检查old_head是否为空指针
!head.compare_exchange_weak(old_head,old_head->next));
std::shared_ptr<T> res;
if(old_head) res.swap(old_head->data);//swap移动节点(而非只是拷贝指针)
try_reclaim(old_head); // 删除数据,回收空间
return res;
}
};
void lock_free_stack::try_reclaim(node* old_head){
if(threads_in_pop==1){ // 计数器为1时
/* 声明“可删除”链表(此时,可能又有线程调用pop(),threads_in_pop+1)
* nodes_to_delete=to_be_deleted;to_be_deleted=nullptr;为一步操作 */
node* nodes_to_delete=to_be_deleted.exchange(nullptr);
if(!--threads_in_pop) // 当只有一个线程调用pop(),计数器-1
delete_nodes(nodes_to_delete); // 删除“可删除”链表的节点
else if(nodes_to_delete) // 不止一个线程调用pop()且nodes_to_delete非空
chain_pending_nodes(nodes_to_delete); // 节点放回到“待删除”链表
delete old_head; // 安全删除旧的头节点
}
else{ // 计数器不为1时
chain_pending_node(old_head); // 向“可删除”链表添加节点
--threads_in_pop; // 计数器-1
}
}
void lock_free_stack::delete_nodes(node* nodes){
while(nodes){//删除链表每个节点
node* next=nodes->next;
delete nodes;
nodes=next;
}
}
void lock_free_stack::chain_pending_nodes(node* nodes){
node* last=nodes;
while(node* const next=last->next) last=next; // 让next指针指向链表的末尾
chain_pending_nodes(nodes,last);
}
void lock_free_stack::chain_pending_nodes(node* first,node* last){
//“可删除”链表(nodes_to_delete)末尾挂载“待删除”链表(to_be_deleted)的头指针
last->next=to_be_deleted;
// 循环保证last->next正确,正确时将to_be_deleted头节点更新为nodes_to_delete
while(!to_be_deleted.compare_exchange_weak(last->next,first));
}
void chain_pending_node(node* n){
chain_pending_nodes(n,n);
}
线程C添加节点Y到to_be_deleted
链表中,即使线程B此时仍将节点Y引用作为old_head
,之后会尝试访问节点Y的next
指针。如果线程A此时删除节点,将会造成线程B发生未定义行为。
当高负荷访问栈时,该方案的链表将无限增加,会再次出现泄漏。
使用风险指针检测不可回收的节点
为了应对高负荷栈访问,将回收节点机制更换为**“风险指针”**检测机制。
当有线程去访问(其他线程)删除的对象时,会先对这个对象设置“风险指针”,而后通知其他线程。当线程想要删除一个对象,就必须检查系统中其他线程是否持有“风险指针”。当没有“风险指针”时,就可以安全删除对象。否则,就必须等待“风险指针”消失。
std::shared_ptr<T> pop(){
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
node* old_head=head.load();
do{
node* temp;
do{ // 循环直到将风险指针设为head指针
temp=old_head;
hp.store(old_head); // 避免设置风险指针时被删除
old_head=head.load(); // 读取最新的head指针
}while(old_head!=temp);
}
while(old_head && // 用compare_exchange_strong避免重复设置“风险指针”
!head.compare_exchange_strong(old_head,old_head->next));
hp.store(nullptr); // 当声明完成,清除风险指针
std::shared_ptr<T> res;
if(old_head){
res.swap(old_head->data);
// 在删除之前对风险指针引用的节点进行检查,是否有其他节点引用
if(outstanding_hazard_pointers_for(old_head))
reclaim_later(old_head); // 放回链表,之后再删除
else
delete old_head; // 直接删除
delete_nodes_with_no_hazards();//如链表无风险指针引用节点,可安全删除节点
}
return res;
}
get_hazard_pointer_for_current_thread()
的实现:
unsigned const max_hazard_pointers=100;
struct hazard_pointer{
std::atomic<std::thread::id> id;
std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];
class hp_owner{
hazard_pointer* hp;
public:
hp_owner(hp_owner const&)=delete;
hp_owner operator=(hp_owner const&)=delete;
hp_owner(): hp(nullptr){
for(unsigned i=0;i<max_hazard_pointers;++i){
std::thread::id old_id;
// 尝试获取风险指针的所有权
if( hazard_pointers[i].id.compare_exchange_strong(
old_id, std::this_thread::get_id()) ){
hp=&hazard_pointers[i];
break;
}
}
if(!hp)
throw std::runtime_error("No hazard pointers available");
}
std::atomic<void*>& get_pointer(){
return hp->pointer;
}
~hp_owner(){
hp->pointer.store(nullptr);
hp->id.store(std::thread::id());
}
};
std::atomic<void*>& get_hazard_pointer_for_current_thread(){
thread_local static hp_owner hazard; // 每个线程都有自己的风险指针
return hazard.get_pointer(); // 返回风险指针
}
outstanding_hazard_pointer_for()
的实现(查找风险指针):
bool outstanding_hazard_pointers_for(void* p){
for(unsigned i=0;i<max_hazard_pointers;++i){
if(hazard_pointers[i].pointer.load()==p) return true;
}
return false;
}
reclaim_later()
添加节点和delete_nodes_with_no_hazards()
删除节点的实现:
template<typename T>
void do_delete(void* p){ delete static_cast<T*>(p); }
struct data_to_reclaim{
void* data;
std::function<void(void*)> deleter; // 产生函数指针
data_to_reclaim* next;
template<typename T>
data_to_reclaim(T* p) : data(p), deleter(&do_delete<T>), next(0){}
~data_to_reclaim(){ deleter(data); } //调用do_delete<T>(data)
};
std::atomic<data_to_reclaim*> nodes_to_reclaim;
void add_to_reclaim_list(data_to_reclaim* node){ // 循环链表头
node->next=nodes_to_reclaim.load();
while(!nodes_to_reclaim.compare_exchange_weak(node->next,node));
}
template<typename T> // 函数模板,使用std::atomic<void*>存储风险指针
void reclaim_later(T* data){
add_to_reclaim_list(new data_to_reclaim(data)); // 创建实例放入回收链表
}
void delete_nodes_with_no_hazards(){
data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr); // 回收节点
while(current){
data_to_reclaim* const next=current->next;
if(!outstanding_hazard_pointers_for(current->data)) // 检测风险指针
delete current;
else
add_to_reclaim_list(current); // 放回链表后面
current=next;
}
}
使用引用计数
之前使用两种方法实现无锁线程安全栈:一种使用引用计数,另一种使用危险指针。但在实际中很难管理。std::shared_ptr<>
内部有引用计数,一些操作是原子操作,但不保证是无锁的。大量上下文使用,使开销大。如果平台支持无锁的std::shared_ptr<>
,那么所有内存回收问题就都迎刃而解了。
使用std::experimental::atomic_shared_ptr<>
(std::atomic< std::shared_ptr<T> >
两者等价)的实现:
#include <experimental/atomic>
using namespace std;
template<typename T>
class lock_free_stack{
private:
struct node{
std::shared_ptr<T> data;
std::experimental::atomic_shared_ptr<node> next;
node(T const& data_):data(std::make_shared<T>(data_)){}
};
std::experimental::atomic_shared_ptr<node> head;
public:
void push(T const& data){
std::shared_ptr<node> const new_node=std::make_shared<node>(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
}
std::shared_ptr<T> pop(){
std::shared_ptr<node> old_head=head.load();
while(old_head && !head.atomic_compare_exchange_weak(
old_head,old_head->next.load()));
if (old_head){
old_head->next=std::shared_ptr<node>();
return old_head->data;
}
return std::shared_ptr<T>();
}
~lock_free_stack(){
while(pop());
}
};
如果不使用std::experimental::atomic_shared_ptr<>
,就要手动管理引用计数。可使用分离引用计数的方式。读取节点时外部计数+1,读取结束时内部计数-1,内部计数为0时删除:
template<typename T>
class lock_free_stack{
private:
struct node;
struct counted_node_ptr{ // 外部计数
int external_count;
node* ptr;
};
struct node{
std::shared_ptr<T> data;
std::atomic<int> internal_count; // 内部计数
counted_node_ptr next;
node(T const& data_):data(std::make_shared<T>(data_))
,internal_count(0){}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter){
counted_node_ptr new_counter;
do{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter));
old_counter.external_count=new_counter.external_count;
}
public:
void push(T const& data){
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load();
while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
}
std::shared_ptr<T> pop(){
counted_node_ptr old_head=head.load();
for(;;){
increase_head_count(old_head);
node* const ptr=old_head.ptr; // 解引用访问节点
if(!ptr) return std::shared_ptr<T>();
if(head.compare_exchange_strong(old_head,ptr->next)){//尾节点非空
std::shared_ptr<T> res;
res.swap(ptr->data);
//external_count-2:链表删除节点-1;不从当前线程访问节点-1。
int const count_increase=old_head.external_count-2;
if(ptr->internal_count.fetch_add(count_increase) ==
-count_increase) delete ptr;
return res;//无论删不删节点,都返回值
}
else if(ptr->internal_count.fetch_sub(1)==1)//相减之后为0
delete ptr;
}
}
~lock_free_stack(){ while(pop()); }
};
目前,使用默认std::memory_order_seq_cst
内存序来规定原子操作的执行顺序。可改变内存序提高效率。
无锁栈上的内存模型
根据上一节的无锁线程安全栈,观察各个原子/非原子操作间的依赖关系,再选择最佳内存序。因此,需在不同场景不同线程的角度来观察内存序,比如入栈后出栈,需要三个重要数据片段:
counted_node_ptr
转移的数据head
。head
引用的节点node
。node->data
。
做push()
的线程,会先构造数据项,并设置head
。
做pop()
的线程,会先加载head
,再循环“比较/交换”操作且引用计数+1,读取node->next
。
在此可得非原子对象next
,为了保证读取安全,必须确定push()
线程和pop()
线程之间的先行关系(happen-before)。因为push()
函数中的原子操作只有compare_exchange_weak()
,所以由此可确定两个线程间的先行关系,调用成功必须是std::memory_order_release
或更严格的内存序。调用失败时可以持续循环下去,所以使用std::memory_order_relaxed
就够了:
void push(T const& data){
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed)
while(!head.compare_exchange_weak(new_node.ptr->next,new_node, std::memory_order_release,std::memory_order_relaxed));
}
因为pop()
函数会使用两个compare_exchange_strong
(还有load()
和fetch_add()
),后一个会读取ptr->next
。
为了让保存先行于读取,前一个compare_exchange_strong(old_counter,new_counter)
如调用成功必须是std::memory_order_acquire
或更严格的内存序操作。如调用失败时可以持续循环,所以使用std::memory_order_relaxed
就够了:
void increase_head_count(counted_node_ptr& old_counter){
counted_node_ptr new_counter;
do{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
head.load()
的初始化并不妨碍分析,可以使用std::memory_order_relaxed
。
因为pop()
函数会再使用一个head.compare_exchange_strong(old_head,ptr->next)
。需确保push()
对于ptr->data
的保存先行于这里的读取(前面一个原子操作已确保,即使后一个的compare_exchange_strong()
操作使用std::memory_order_relaxed
)。唯一需注意的是res.swap(ptr->data)
,且无其他线程可以对同一节点进行操作(“比较/交换”操作的作用)。当compare_exchange_strong()
失败时, 使用std::memory_order_relaxed
就够了,因为前一个原子操作已经使用了std::memory_order_acquire
。
由于有acquire和release的先行与同步保证,其余线程无需设置更严格的内存序。
fetch_add()
会改变引用计数。但是任何未成功检索数据的线程都知道另一个线程使用swap()
提取数据。为避免数据竞争,要确保swap()
先行于delete
。最简单的方法:操作成功时内存序为std::memory_order_acquire
;失败时循环内存序为std::memory_order_release
。但是delete
只会使用一次,在delete
前使用load(std::memory_order_acquire)
确保先行关系即可。
template<typename T>
class lock_free_stack{
private:
struct node;
struct counted_node_ptr{ // 外部计数
int external_count;
node* ptr;
};
struct node{
std::shared_ptr<T> data;
std::atomic<int> internal_count; // 内部计数
counted_node_ptr next;
node(T const& data_):data(std::make_shared<T>(data_))
,internal_count(0){}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter){
counted_node_ptr new_counter;
do{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
public:
void push(T const& data){
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed);
while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
std::memory_order_release,std::memory_order_relaxed));
}
std::shared_ptr<T> pop(){
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;){
increase_head_count(old_head);
node* const ptr=old_head.ptr; // 解引用访问节点
if(!ptr) return std::shared_ptr<T>();
if(head.compare_exchange_strong(old_head,ptr->next,
std::memory_order_relaxed)){//尾节点非空
std::shared_ptr<T> res;
res.swap(ptr->data);
//external_count-2:链表删除节点-1;不从当前线程访问节点-1。
int const count_increase=old_head.external_count-2;
if(ptr->internal_count.fetch_add(count_increase) ==
-count_increase) delete ptr;
return res;//无论删不删节点,都返回值
}
else if(ptr->internal_count.fetch_sub(1)==1){//相减之后为0
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
~lock_free_stack(){ while(pop()); }
};