1. 概述
在我们的日常开发中,经常会使用队列这种数据结构,需要它的队尾进、队头出的特点。于是,Doug Lea大师设计了一个线程安全的队列ConcurrentLinkedQueue,它是采用链表的形式构成的。我们接下来尝试通过代码去了解其中的设计思想。
2. 成员变量
private static class Node<E> {
volatile E item; // 元素值
volatile Node<E> next; // 指向下一个节点
}
ConcurrentLinkedQueue底层采用的是链表的数据结构,因此通过静态内部类定义了链表节点Node,包含两个基本的变量。
// 头指针
private transient volatile Node<E> head;
// 尾指针
private transient volatile Node<E> tail;
成员变量也比较简单,包括头指针和尾指针。
3. 构造方法
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
常用的构造方法是这个无参的构造方法,创建了一个空节点,并让头指针和尾指针都指向这个节点。具体状态如下图所示:
4. offer方法
public boolean offer(E e) {
checkNotNull(e); // 插入的元素必须非null
final Node<E> newNode = new Node<E>(e); // 创建新的节点
for (Node<E> t = tail, p = t;;) { // 尾指针指向赋值为p
Node<E> q = p.next; // q指向p的下一个节点
if (q == null) { // 如果p是真正的尾节点
if (p.casNext(null, newNode)) { // 通过CAS方式设置尾节点为新的节点
if (p != t) // 如果尾指针指向的不是真的尾节点
casTail(t, newNode); // CAS更新尾指针,允许失败
return true;
}
}
else if (p == q) // 哨兵,有元素从队头出队了
p = (t != (t = tail)) ? t : head;
else // 定位新的队尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
初次看上面这份offer的代码,可能会看得云里雾里的,因为这份代码考虑了很多多线程不安全的情况,所以就会比较复杂。
我们先从单线程角度来看这份代码。首先,尾指针tail指向的节点赋值给节点p,p的下一个节点被赋值给节点q。然后判断q为null的话,就意味着p就是链表中的最后一个节点。所以就尝试用CAS方法将p的下一个节点设置为newNode,如果设置成功,接着尝试用CAS方法设置newNode为尾节点。这和我们常见的尾插法是一样的,但是因为casTail方法可能失败,就有可能出现下面这种情况:
尾指针不一定在每一时刻都指向真正的尾节点,可能存在延迟更新的情况。 因此代码中,并不会直接对Tail指向的节点插入newNode,而是通过p去寻找真正的尾节点,再用CAS的方式插入newNode。
我们再来看这行代码
p = (p != t && t != (t = tail)) ? t : q;
如果是单线程情况下,那么t肯定是等于tail的,所以三元表达式的值一直都是p=q,也即让p不断向后寻找,直到找到真的尾节点。
但是在多线程情况下,t != (t = tail)其实不是一个原子操作,可能会出现这样的情况:
对于等式的左边,线程A先读取了变量t。接着线程B立刻修改了尾节点,这就会造成线程A在读取(t=tail)时发现两者不一样,就会得到p=t的结果。这样做的 意义在于此时(t=tail)是最新的尾节点,所以让p指向t可以更快地找到尾节点进行插入操作。
而对于这段代码
else if (p == q):
p = (t != (t = tail)) ? t : head;
这主要是涉及到哨兵节点,当发现有线程在队头出队列后,就会出现p=q的情况,就需要从头指针重新开始搜索头节点,即p=head。
5. poll方法
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) { // 头指针节点复制给p,h
E item = p.item; // p节点的元素值
// 如果不为null,说明是真的头节点,则CAS尝试设置元素值为null
if (item != null && p.casItem(item, null)) {
// 如果p节点和最初的h节点不一样,说明head指针指向的不是真的头节点
if (p != h)
// 更新头节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果没有下一个节点了,无法删除了
else if ((q = p.next) == null) {
updateHead(h, p); // 同样更新头节点
return null;
}
else if (p == q) // 哨兵节点情况
continue restartFromHead; // 一切从头来过
else // 寻找真的头节点,p节点不断往后寻找
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p)) // CAS设置新的头节点
h.lazySetNext(h);
}
// CAS设置下一个节点
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
poll方法实际上和offer方法的思想是一样的,都是使用了延迟更新的方式。当单线程的情况下,要删除队头元素,就是要找到头指针指向的节点,进行删除就行了。但是在多线程情况下,头指针指向的节点不一定是真的头节点,可能这个节点之前已经被删除过了。如初始状态:
因此,代码中首先将头指针指向的节点赋值给p和h。如果p中的元素值不为null,说明是真的头节点,尝试用CAS设置元素值为null,即完成出队操作。如果p中的元素值为null,那就说明这个节点之前就已经被删除过了,就需要通过p= q = p.next不断往后寻找,找到真正的头节点。比如下图就是head节点实际指向了已经被删除过的节点:
因此就通过p节点去寻找到真正的头节点:
然后我们通过下面的代码去更新新的头节点:
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p)) // CAS设置新的头节点
h.lazySetNext(h);
}
在代码中,我们会看到h.lazySetNext(h)这一行,其实就是将已经删除过的节点自己指向自己。这么做的原因就是会提醒offer和poll线程,有元素出队了,需要重新计算,就会出现之前说的p=q的情况,因为q=p.next。这样就是我们哨兵节点的作用。
6. 总结
ConcurrentLinkedQueue的设计方式采用的是延迟更新头指针和尾指针。通常我们为了保证同步,在入队和出队的时候需要加入synchronized或者锁,但是这种方式是消耗很大的。而Doug Lea大师使用CAS的方式轻松化解线程不安全问题。其实他也可以通过自旋的方式来做到实时更新头指针和尾指针,但是这会带来一定的消耗。延迟更新的设计方式则可以大大减少CAS的次数,提升效率。
参考文章:并发容器之ConcurrentLinkedQueue