文章目录
- 无锁队列
- 1. 无锁队列原理
- 1.1 多线程并发控制策略介绍
- 1.2 无锁队列概念
- 1.3 无锁队列的分类
- 1.3.1 以生产者消费者数量划分
- 1.3.2 以底层数据结构划分
- 1.3.3 侵入式与非侵入式链表队列
- 1.4 无锁队列应用场景
- 2. 无锁队列的实现
- 2.1 MPSCQueue
- 2.2 rte_ring
- 学习参考
无锁队列
本文讲述了无锁队列的原理、分类、底层数据结构、应用场景,以及两种实现方式。
1. 无锁队列原理
1.1 多线程并发控制策略介绍
-
blocking(阻塞)
阻塞是指在多线程操作共享资源时,通过锁等同步原语来协调访问。当一个线程试图访问已被其他线程持有的资源时,它会等待直到该资源被释放。
特点:通过锁等机制阻塞线程实现同步;系统内可能出现循环等待的情况,造成死锁,导致整个系统无法向前推进。
以互斥锁举例:
#include <iostream> #include <mutex> #include <thread> #include <vector> std::mutex mtx; // 全局互斥锁 int shared_resource = 0; void blockingIncrement() { std::lock_guard<std::mutex> lock(mtx); // 获取锁,阻塞其他线程 ++shared_resource; std::cout << "Blocking increment: " << shared_resource << "\n"; } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) { threads.emplace_back(blockingIncrement); } for (auto& t : threads) { t.join(); } return 0; }
-
lock free(无锁)
无锁是一种非阻塞的并发控制方式,确保在操作过程中,至少有一个线程能够在有限的重试后完成任务。无锁操作通常依赖于原子操作,如CAS(Compare-And-Swap),在不需要显式锁的情况下保证数据的正确性。
**特点:**通常依赖CAS等原子操作访问共享数据;非阻塞、无死锁,保证系统内至少有一个线程能够向前推进并在有限时间内完成;但可能出饥饿和饿死。
举例:
#include <iostream> #include <atomic> #include <thread> #include <vector> std::atomic<int> atomic_resource{0}; void lockFreeIncrement() { int old_value = atomic_resource.load(); while (!atomic_resource.compare_exchange_weak(old_value, old_value + 1)) { // CAS失败时,old_value会被重新加载当前值,继续尝试 } std::cout << "Lock-Free increment: " << atomic_resource.load() << "\n"; } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) { threads.emplace_back(lockFreeIncrement); } for (auto& t : threads) { t.join(); } return 0; }
-
wait free(无等待)
无等待是更严格的一种无锁设计,保证所有线程都能在有限步数内完成操作。无等待算法要求即使在最坏情况下,每个线程都能在有限的时间内获得进展,而无需重试或等待其他线程。
特点:即使在最坏的情况下,系统内每个线程都能够在有限的时间内取得进展,不会有饿死现象出现。
举例:
#include <iostream> #include <atomic> #include <thread> #include <vector> std::atomic<int> waitFreeResource{0}; void waitFreeIncrement() { int newValue = waitFreeResource.fetch_add(1) + 1; // 无等待操作 std::cout << "Wait-Free increment: " << newValue << "\n"; } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) { threads.emplace_back(waitFreeIncrement); } for (auto& t : threads) { t.join(); } return 0; }
1.2 无锁队列概念
**无锁队列(Lock-Free Queue)**是一种在多线程环境下用于存储和访问数据的队列数据结构,避免了传统锁(如互斥锁、信号量)带来的线程阻塞问题。无锁队列通过利用原子操作(如CAS,Compare-And-Swap)来保证队列操作的安全性,使得在高并发环境中拥有更好的性能和较低的资源开销。
无锁队列的主要目标是实现”非阻塞“的数据操作。允许多个线程并发地执行入队和出队操作,不会因为等待锁释放而阻塞。
无锁队列依赖CAS原子操作来保证只有一个线程成功地更新队列队列状态,而失败的线程会重新操作,指导成功。
特点:
- 依靠原子操作CAS来保证线程安全
- 非阻塞,线程不会因为原子操作失败而陷入阻塞
- 无死锁,保证系统内至少有一个线程能够运行并完成原子操作
无锁队列的优点:
- 高性能:不需要加锁、不会引起线程阻塞和线程切换
- 避免死锁
- 低延迟
1.3 无锁队列的分类
1.3.1 以生产者消费者数量划分
根据队列中数据的生产者和消费者的数量可以划分为:
- SPSC:单生产者/单消费者队列
- SPMC:单生产者/多消费者队列
- MPSC:多生产者/单消费者队列
- MPMC:多生产者/多消费者队列
1.3.2 以底层数据结构划分
根据其底层使用的数据结构可以划分为:
-
循环数组
入队操作:
Q[head] = val; head = (head + 1) % size;
出队操作:
tail = (tail + 1) % size;
-
单链表
入队操作:
head->next = new_node; head = head->next;
出队操作:
tail = tail->next;
-
混合
入队操作:
head->queue[head_index] = val; if (++head_index == size) { head = head->next; head_index = 0; }
出队操作:
if (++tail_index == size) { tail = tail->next; tail_index = 0; }
循环数组 vs 单链表:
- 数组需要预先分配空间,可能造成空间浪费,而链表是动态增加的。
- 数组通常性能更好,因为不需要动态分配内存,并且连续的内存块使缓存命中率更高。
- 由于数组存在容量限制,当队列满时可能会返回失败或者阻塞。因此基于数组的队列通常不是严格无锁的。
- 数组适用于固定任务量、生产者与消费者数量平衡的场景;而链表适用于动态任务量,不平衡生产消费场景。
1.3.3 侵入式与非侵入式链表队列
侵入式链表队列
侵入式链表队列将链表指针嵌入到数据结构内部,即数据本身包含了链表节点的指针。例如,每个数据对象包含了指向前后节点的指针。
特点:
- 数据与链表结构合并:每个数据对象直接包含链表的指针,成为链表的一部分。
- 内存效率高:数据布局更加紧凑,减少了额外分配的开销。
- 操作性能好:数据节点在链表中的位置直接定义在对象中,操作时减少了指针的查找和解引用。
非侵入式链表队列
非侵入式链表队列在链表结构和数据对象之间进行了分离,链表节点包含一个指向数据对象的指针,而数据对象不包含链表指针。链表节点与数据对象是两个独立的对象。
特点:
- 数据与链表结构分离:链表节点不嵌入到数据对象中,链表节点通过指针指向数据对象。
- 灵活性高、复用性强:数据对象可以被多个链表共享,支持复用和不同数据结构组合使用。
1.4 无锁队列应用场景
无锁队列vs有锁队列
- 性能方面,在高并发场景下,无锁队列性能通常优于有锁队列。因为无锁队列基于原子操作实现线程安全,减少了锁的争用和线程阻塞,减少了线程上下文切换开销。在低并发场景下,锁争用较少,两者性能相近。
- 编程复杂性方面:无锁队列显然更为复杂,需要CAS原子操作、指针标记、可能出现ABA问题等。
ABA问题
ABA问题发生在一个线程检查一个变量的值并打算对其进行更新时,另一个线程在期间将该变量的值从A修改成B,又改回A。第一个线程在检查到值没有变化(仍然是A)时,会认为该变量没有被其他线程修改过,从而认为可以安全地进行操作。但实际上,变量在这段时间内可能发生了多次变更,影响了数据的一致性,导致潜在的错误。
无锁队列总是比有所队列性能高吗?
取决于以下因素:
- 并发强度:低并发场景下,锁竞争较少,锁的开销并不显著,甚至可能优于无锁队列,因为无锁队列的CAS操作仍然需要一定的开销。但当并发强度过大负载极高的时候,原子操作不断重试的开销可能超过锁竞争带来的上下文切换的开销。最终还是要通过实验确定。
- 原子操作的复杂度:复杂的操作可能需要反复尝试CAS,特别是在高并发下。这种重试可能会带来额外的性能损耗。
- 硬件对原子操作的支持度
什么时候适合使用无锁队列?
-
无锁队列:适用于高并发、对实时性有要求的场景,如任务队列、网络数据包处理、金融交易系统等。
-
高并发访问:避免线程阻塞和上下文切换,适合高吞吐量需求的应用。
-
实时性要求高:无锁队列在实时系统和嵌入式系统中较常见,减少延迟。
-
-
有锁队列:适用于低并发、对操作顺序有严格要求的场景,如日志系统、文件处理、数据库事务管理等。
-
低并发访问:锁的开销在低并发下可接受,代码实现简单。
-
顺序性和公平性要求高:锁机制可以保障每个线程的公平性,不会出现饥饿。
-
2. 无锁队列的实现
2.1 MPSCQueue
- 类型:多生产者单消费者;链表;非侵入式。
#include <atomic>
#include <utility>
template<typename T>
class MPSCQueueNonIntrusive
{
public:
MPSCQueueNonIntrusive() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
{
Node* front = _head.load(std::memory_order_relaxed);
front->Next.store(nullptr, std::memory_order_relaxed);
}
~MPSCQueueNonIntrusive()
{
T* output;
while (Dequeue(output))
delete output;
Node* front = _head.load(std::memory_order_relaxed);
delete front;
}
// wait-free
void Enqueue(T* input)
{
Node* node = new Node(input);
Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);
prevHead->Next.store(node, std::memory_order_release);
}
bool Dequeue(T*& result)
{
Node* tail = _tail.load(std::memory_order_relaxed);
Node* next = tail->Next.load(std::memory_order_acquire);
if (!next)
return false;
result = next->Data;
_tail.store(next, std::memory_order_release);
delete tail;
return true;
}
private:
struct Node
{
Node() = default;
explicit Node(T* data) : Data(data)
{
Next.store(nullptr, std::memory_order_relaxed);
}
T* Data;
std::atomic<Node*> Next;
};
std::atomic<Node*> _head;
std::atomic<Node*> _tail;
MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const&) = delete;
MPSCQueueNonIntrusive& operator=(MPSCQueueNonIntrusive const&) = delete;
};
上面是一个非侵入式的多生产者单消费者无锁队列。其入队操作可以由多个生产者线程并发执行:
Node* node = new Node(input);
Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);
prevHead->Next.store(node, std::memory_order_release);
exchange原子操作,将新插入的节点赋值给head,并获取到head之前保存的值prevHead,由于这一步是原子的,所以多个生产者线程并发执行也没有问题。
第二步,将prevHead的Next域设置为node。这时需要分析其与消费者线程是否存在竞态条件。消费者线程的步骤如下:
Node* tail = _tail.load(std::memory_order_relaxed);
Node* next = tail->Next.load(std::memory_order_acquire);
if (!next)
return false;
result = next->Data;
_tail.store(next, std::memory_order_release);
delete tail;
return true;
由于只支持单消费者线程,所以不用分析消费者线程之间的并发安全,只需要关注消费者于生产者之间的线程安全。
_tail指向一个伪结点,而_tail的Next域才指向队列的尾部元素,因此要先取出尾部元素并保存到next中。如果next为空,则直接返回false,这样就避免了队列为空_tail == _head时的竞态条件。当next不为空时,next可能为_head或者入队操作的中间变量prevHead,但是这都不影响之后的将next赋值给_tail的操作。因此,可以分析出生产者和消费者之间是线程安全的。
2.2 rte_ring
该无锁队列来源于dpdk
- 类型:适用于各种生产者消费者数量;循环数组;
保存无锁队列的数据结构:
struct rte_ring {
char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */
int flags; /**< Flags supplied at creation. */
/** Ring producer status. */
struct prod {
uint32_t watermark; /**< Maximum items before EDQUOT. */
uint32_t sp_enqueue; /**< True, if single producer. */
uint32_t size; /**< Size of ring. */
uint32_t mask; /**< Mask (size-1) of ring. */
volatile uint32_t head; /**< Producer head. */
volatile uint32_t tail; /**< Producer tail. */
} prod __rte_cache_aligned;
/** Ring consumer status. */
struct cons {
uint32_t sc_dequeue; /**< True, if single consumer. */
uint32_t size; /**< Size of the ring. */
uint32_t mask; /**< Mask (size-1) of ring. */
volatile uint32_t head; /**< Consumer head. */
volatile uint32_t tail; /**< Consumer tail. */
#ifdef RTE_RING_SPLIT_PROD_CONS
} cons __rte_cache_aligned;
#else
} cons;
#endif
#ifdef RTE_LIBRTE_RING_DEBUG
struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endif
void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
* not volatile so need to be careful
* about compiler re-ordering */
};
需要注意的点如下:
-
__rte_cache_aligned
其定义为:
#define __rte_cache_aligned __attribute__((__aligned__(CACHE_LINE_SIZE)))
__aligned__是一个GNU扩展属性,用于告诉编译器内存对齐的方式。这里以CACHE_LINE_SIZE(即缓存行大小,通常是64字节)个字节为界限进行对齐,可以保证结构体中每个变量彼此都不共享一个缓存行,从而在多线程并发修改这些变量时,优化性能。详细原理请查看我的上一篇博客中关于缓存行和缓存一致性协议的介绍。
-
sp_enqueue, sc_dequeue
分别用来标记生产者队列与消费者队列是否时单线程的。
创建无锁队列:
/* create the ring */
struct rte_ring *
rte_ring_create(const char *name, unsigned count, unsigned flags)
{
struct rte_ring *r;
size_t ring_size;
/* count must be a power of 2 */
if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
errno = EINVAL;
return NULL;
}
ring_size = count * sizeof(void *) + sizeof(struct rte_ring);
r = (struct rte_ring *)malloc(ring_size);
if (r != NULL) {
/* init the ring structure */
memset(r, 0, sizeof(*r));
snprintf(r->name, sizeof(r->name), "%s", name);
r->flags = flags;
r->prod.watermark = count;
r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
r->prod.size = r->cons.size = count;
r->prod.mask = r->cons.mask = count-1;
r->prod.head = r->cons.head = 0;
r->prod.tail = r->cons.tail = 0;
} else {
errno = ENOBUFS;
}
return r;
}
其中struct rte_ring
中保存了一个生产者任务队列和一个消费者任务队列,注意
ring_size = count * sizeof(void *) + sizeof(struct rte_ring);
r = (struct rte_ring *)malloc(ring_size);
这两行代码说明其将循环数组与无锁队列的标识信息等保存在了一个连续内存中。
mask
变量是为了优化取模运算的性能,由于其规定count必须是2的整数次幂,因此head = (head + 1) % size
操作可以优化为head = head & (count - 1)
,所以用mask来保存count - 1
,可以进一步提高性能。
多线程入队操作
static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
const unsigned max = n;
int success;
unsigned i;
uint32_t mask = r->prod.mask;
int ret;
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
n = max;
prod_head = r->prod.head;
cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* prod_head > cons_tail). So 'free_entries' is always between 0
* and size(ring)-1. */
free_entries = (mask + cons_tail - prod_head);
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, enq_fail, n);
return -ENOBUFS;
}
else {
/* No free entry available */
if (unlikely(free_entries == 0)) {
__RING_STAT_ADD(r, enq_fail, n);
return 0;
}
n = free_entries;
}
}
prod_next = prod_head + n;
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
} while (unlikely(success == 0));
/* write entries in ring */
ENQUEUE_PTRS();
COMPILER_BARRIER();
/* if we exceed the watermark */
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
}
else {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
__RING_STAT_ADD(r, enq_success, n);
}
/*
* If there are other enqueues in progress that preceeded us,
* we need to wait for them to complete
*/
while (unlikely(r->prod.tail != prod_head))
rte_pause();
r->prod.tail = prod_next;
return ret;
}
需要注意的点如下:
- 原子性地调整r->prod.head
入队操作的关键步骤是调整r->prod.head,只要能够保证这个调整操作是原子的,就能够保证多线程的线程安全。
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
这条语句是一个CAS原子操作,如果r->prod.head == pro_head, 就将prod_next赋值给r->prod.head并返回非0值,否则返回0。这条语句保证了调整队头的线程安全性。
- 复制插入对象
接下来,将插入对象obj_table复制到刚获得的数组空间:
/* write entries in ring */
ENQUEUE_PTRS();
COMPILER_BARRIER();
- 索引值与空闲空间的计算问题
一个可能引起疑惑的点是,对于循环数组,在调整head时应该对size取模(或 & mask)才对,为什么代码中是这样的:
prod_next = prod_head + n;
在计算空闲空间是,也是似乎按照非循环的数组的方式:
free_entries = (mask + cons_tail - prod_head);
答案是,最终的取模操作被放在了最终的读取和写入操作中,也就是ENQUEUE_PTRS();
中:
#define ENQUEUE_PTRS() do { \
const uint32_t size = r->prod.size; \
uint32_t idx = prod_head & mask; \
... 省略 ...
作为32位无符号整数,prod_head如果大于2^32 - 1,则会自动从0开始。如果prod_head < cons_tail,则已插入元素的个数为2 ^ 32 + head - tail,等于head - tail的无符号表示,因此free_entries总是可以用mask + tail - head来计算。
- 线程安全地调整prod.tail
对于从旧的prod_head到prod_next之间的内存空间,由于其可以是每个入队操作私有的,所以只需要考虑不让出队操作访问即可。这是通过prod.tail实现的,只有当复制操作完成之后,才会调整r->prod.tail,从而保证出队操作不在复制期间访问这段空间。
代码中是这样保证调整prod.tail的线程安全:
while (unlikely(r->prod.tail != prod_head))
rte_pause();
r->prod.tail = prod_next;
这样prod.tail等于插入元素前的head的时候,才将调整为插入后的头部。否则,说明有其它入队线程正在执行复制操作,还没有调整prod.tail为它的局部的prod_head,这是本线程进行等待。这样可以保证prod.tail能够按照插入的顺序进行增长。
多线程出队操作
static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail;
uint32_t cons_next, entries;
const unsigned max = n;
int success;
unsigned i;
uint32_t mask = r->prod.mask;
/* move cons.head atomically */
do {
/* Restore n as it may change every loop */
n = max;
cons_head = r->cons.head;
prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* cons_head > prod_tail). So 'entries' is always between 0
* and size(ring)-1. */
entries = (prod_tail - cons_head);
/* Set the actual entries for dequeue */
if (n > entries) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, deq_fail, n);
return -ENOENT;
}
else {
if (unlikely(entries == 0)){
__RING_STAT_ADD(r, deq_fail, n);
return 0;
}
n = entries;
}
}
cons_next = cons_head + n;
success = rte_atomic32_cmpset(&r->cons.head, cons_head,
cons_next);
} while (unlikely(success == 0));
/* copy in table */
DEQUEUE_PTRS();
COMPILER_BARRIER();
/*
* If there are other dequeues in progress that preceded us,
* we need to wait for them to complete
*/
while (unlikely(r->cons.tail != cons_head))
rte_pause();
__RING_STAT_ADD(r, deq_success, n);
r->cons.tail = cons_next;
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}
出队操作与入队操作代码和原理类似,不再多做说明。
学习参考
学习更多相关知识请参考零声 github。