写在前面
在看无锁队列之前,我们先来看看看队列的操作。队列是一种非常重要的数据结构,其特性是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信间经常采用队列做缓存,缓解数据处理压力。根据队列操作的场景可分为以下4种:
无锁队列应用场景:
如果你的业务,数据量不大,一秒只需要处理几百上千的数据,就没必要用无锁队列了。当需要处理的数据非常多,比如,每秒需要处理几十万条数据时,可以考虑用无锁队列。
一、有锁和无锁
实际上有锁和无锁,就是我们平时所说的乐观锁和悲观锁:
- 悲观锁:一种悲观的加锁策略,它认为每次访问共享资源的时候,总会发生冲突,所以宁愿牺牲性能(时间)来保证数据安全;
- 乐观锁:是一种乐观的策略,它假设线程访问共享资源不会发生冲突,所以不需要加锁,因此线程将不断执行,不需要停止。一旦碰到冲突,就重试当前操作直到没有冲突为止。
无锁通过(CAS Compare And Swap)技术来实现,CAS是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可预知性产⽣的数据不⼀致问题。
bool CAS( int * pAddr, int nExpected, int nNew )
{
if ( *pAddr == nExpected ) {
*pAddr = nNew ;
return true ;
}
else{
return false ;
}
}
工作原理:将pAddr地址中的元素与nExpected比较,如果相等,则更新pAddr的值为nNew,并返回true;否则返回false。
二、无锁队列的优势
上面我们提到,在数据量小的时候直接采用加锁的方式,实现资源的排他性访问即可。但是加锁的缺点也很明显:
- CPU会将大量的时间用在锁的维护上,而不是数据处理;
- 在线程之间切换的时候,导致Cache中的数据失效。CPU访问Cache的速度是远大于内存的,所以需要尽量减少线程频繁切换 。
三、无锁队列实现
3.1 zmq实现
zmq实现无锁队列,只要在ypipe.hpp 和 yqueue.hpp 两个文件,适用于一读一写的场景。
3.1.1 yqueue.hpp
template <typename T, int N>
class yqueue_t
{
......
private:
// Individual memory chunk to hold N elements.
// 链表结点称之为chunk_t
struct chunk_t
{
T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
chunk_t *prev;
chunk_t *next;
};
chunk_t *begin_chunk; // 链表头结点
int begin_pos; // 起始点
chunk_t *back_chunk; // 队列中最后一个元素所在的链表结点
int back_pos; // 尾部
chunk_t *end_chunk; // 拿来扩容的,总是指向链表的最后一个结点
int end_pos;
atomic_ptr_t<chunk_t> spare_chunk; //空闲块(把所有元素都已经出队的chunk,称为空闲块),读写线程的共享变量
};
yqueue_t 结构,内部由⼀个⼀个chunk组成,每个chunk保存N个元素。chunk之间通过指针连接。
当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。
在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。
yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:
- begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个元素在当前chunk中的位置。
- back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_po⽤于指向队列最后⼀个元素在当前chunk的位置。
- end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位置
这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:
- back_chunk/back_pos:对应的是元素存储位置;
- end_chunk/end_pos:决定是否要分配chunk或者回收chunk。
yqueue_t 构造函数
inline yqueue_t()
{
begin_chunk = (chunk_t *)malloc(sizeof(chunk_t));
alloc_assert(begin_chunk);
begin_pos = 0;
back_chunk = NULL; //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空
back_pos = 0;
end_chunk = begin_chunk; //end_chunk总是指向链表的最后一个chunk
end_pos = 0;
}
说明:
采用chunk的机制,减少内存分配次数,采用spark_chunk的机制回收读完的chunk,充分利用局部性原理,提升性能。
end_chunk 总是指向最后分配的chunk,刚分配出来的chunk,end_pos也是0
back_chunk 在插入元素时才会指向对应的chunk,初始化的是是指向NULL的。
ront、back函数
inline T &front() // 返回的是引⽤,是个左值,调⽤者可以通过其修改容器的值
{
return begin_chunk->values[begin_pos]; // 队列⾸个chunk对应的的begin_pos
}
inline T &back() // 返回的是引⽤,是个左值,调⽤者可以通过其修改容器的值
{
return back_chunk->values[back_pos];
}
说明:
我们可以通过 begin_chunk->values[begin_pos] 获取到队列的头部,以及back_chunk->values[back_pos]获取到队列的尾部
Push函数
每次调用push,会更新 back_chunk 和 back_pos ,以及根据end_pos的值,决定是否需要重新分配chunk。
inline void push()
{
back_chunk = end_chunk;
back_pos = end_pos; //
if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满
return;
chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
if (sc) // 如果有spare chunk则继续复用它
{
end_chunk->next = sc;
sc->prev = end_chunk;
}
else // 没有则重新分配
{
// static int s_cout = 0;
// printf("s_cout:%d\n", ++s_cout);
end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk
alloc_assert(end_chunk->next);
end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}
重新分配规则如下:
- 如果 ++end_pos != N 说明当前chunk还有空间,直接返回
- 如果++end_pos == N 说明,当前chunk只有N-1的位置可用,需要再按分配一个chunk。 这个chunk 会先尝试从spare_chunk获取,如果spare_chunk为NULL,则需要重新分配。
pop函数
pop的时候begin_pos就会++,也就是会更新front的位置。当chunk 中的所有元素都被取出才会触发chunk回收机制。spare_chunk的操作要求是原子操作,因为读写线程都会访问spare_chunk。
inline void pop()
{
if (++begin_pos == N) // 删除满一个chunk才回收chunk
{
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
free(cs);
}
}
3.2 ypipe_t
inline ypipe_t()
{
// Insert terminator element into the queue.
queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置
// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器
c.set(&queue.back());
}
在ypipe对象创建的时候,更新yqueue的back_chunk的值,以及初始化 r w f c四个指针。
write函数
write的incomplete_标志位决定要不要更新f的位置
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();
// Move the "flush up to here" poiter.
if (!incomplete_)
{
f = &queue.back(); // 记录要刷新的位置
// printf("1 f:%p, w:%p\n", f, w);
}
else
{
// printf("0 f:%p, w:%p\n", f, w);
}
}
flush函数
flush的核心是更新c指针和w指针的位置
inline bool flush()
{
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是还没有新元素加入
return true;
// Try to set 'c' to 'f'.
// read时如果没有数据可以读取则c的值会被置为NULL,如果c==null 说明read线程在 休眠,可以安全的设置c的值
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
{
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新w的位置
w = f;
return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理
}
else // 读端还有数据可读取
{
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 只需要更新w的位置
return true;
}
}
read函数
read会先采取预读的机制,判断有无数据可读,可读就填充到value_。核心就是 check_read函数。主要通过比较c和队头的位置判断是否有数据可读,如果可读返回的就是flush后的f的位置。
inline bool check_read()
{
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
return true;
// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
// 两种情况
// 1. 如果c值和queue.front()相等,返回旧c值,将c值置为NULL,说明此时没有数据可读
// 当c==&queue.front()时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程检查读线程是否睡眠的标志。
// 2. 如果c值和queue.front()不相等, 直接返回c值,此时可能有数据度的去
r = c.cas(&queue.front(), NULL); //尝试预取数据
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items are being deallocated.
if (&queue.front() == r || !r) //判断是否成功预取数据
return false;
// There was at least one value prefetched.
return true;
}
inline bool read(T *value_)
{
// Try to prefetch a value.
if (!check_read())
return false;
// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front();
queue.pop();
return true;
}