文章目录
- 1 ConcurrentLinkedQueue是什么
- 2 核心属性详解
- 3 核心方法详解
- 3.1 add(E e)
- 3.2 offer(E e)
- 3.3 poll()
- 3.4 size()
- 3.5 并发情况分析
- 4 总结
1 ConcurrentLinkedQueue是什么
ConcurrentLinkedQueue是一个无界的并发队列,和LinkedBlockingQueue相比,它是通过完全的cas实现的,是非阻塞的。LinkedBlockingQueue是通过ReentrantLock实现的,提供了一些阻塞方法,如take() put()。
2 核心属性详解
//链表的头和尾节点
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
//Node的数据结构
private static class Node<E> {
//保存的元素
volatile E item;
//单向链表的当前Node的next节点
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
//cas设置当前item值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//设置next节点的值
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//cas设置next节点的值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//...
}
3 核心方法详解
3.1 add(E e)
调用offer方法。在
public boolean add(E e) {
return offer(e);
}
3.2 offer(E e)
首先在看这个方法之前,先了解一个掌握逻辑的方法。因为下面代码是无锁自旋(cas)代码,所以有很多触发条件,如果直接看是很难懂,
所以这里的小技巧是先不管多线程,去看逻辑。如下面的for循环,你先按照单线程调用了3~4次看看数据变化。先掌握它正常逻辑下的数
据结构的变化。因为是单向链表,看看节点之间是怎么变化的。
看下面流程再回头看这段代码。
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
Node<E> t = tail;
Node<E> p = t;
for (;;) {
Node<E> q = p.next;
if (q == null) {
// 追加节点 原子性操作,会有失败的情况
if (p.casNext(null, newNode)) {
// 跃过第一次设置tail
if (p != t) // hop two nodes at a time
//设置尾节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
//poll情况,即存和取同时发生
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// 第二次设置的时候q!=null的情况重新设置p节点往后移
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
1.如果当前链表中无元素,此时根据构造器可知 head = tail = new Node<>(null); 此时添加一个元素。如图所示
此时 p.next == null 成立,所以会进入 casNext语句。此时成功了 p == t 是true, 所以返回true结束,此时数据结构变成下图
此时我再添加一个元素,p.next != null了,p == q也不成立, 所以走到最后一个else:p = (p != t && t != (t = tail)) ? t : q;
这段逻辑相当于 t =tail; 因为p == t 所以 p 变成了q。再次循环。
此时p就是NODE1了 q 是null了 走p.casNext设置NODE2 称为NODE1的next节点。注意!! 此时tail.next还是NODE1。如下图
此时再添加一个元素呢
此时流程中会命中p != t 重新设置tail, 此时node3就是tail
3.3 poll()
首先现在的数据结构是这样。
- 执行刚开始的时候 p 指向的是head 此时p的item == null。执行到 else p = q; 注意因为执行到else if ((q = p.next) 此时q = p.next,即p的下标到了head.next。此时在判断item是否是null 此时不是null了,去除n1 然后cas设置为null 此时p != h 因为往后移了一下,又因为 node1.next !=null 所以更新head为未node2。
此时如下图
- 如果再次poll
此时 p指向的是node2, 此时p.item != null 所以直接cas 设置成null 此时p == h成立 直接return,此时如下图
其实head是没动的。下次呢 此时item是空,那么又会向第一步一样。至此正常流程已经分析完
public E poll() {
restartFromHead:
for (;;) {
Node<E> h = head;
Node<E> p = h;
Node<E> q = null;
for (;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
3.4 size()
注意,因为他没有维护count字段,所以他计算数量是遍历计算的。不维护是因为上面是通过cas方式+循环保证原子性的,如果在加一个count字段,那失败重试的概率将大大增加
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
3.5 并发情况分析
上面已经分析了核心的入队列和出队列的两个方法,他不是实时更新head和tail节点,而是通过一次循环之后更新head和tail节点.
此时并发情况下,cas保证了原子性的设置。
-
offer方法
**p.casNext(null, newNode)**保证了原子性的追加链表元素。成功了设置tail 此时第一步成功不代表第二步(casTail(t, newNode))一定成功,因为此时可能别的线程已经改了tail。失败了怎么办呢? 失败了其实就是其他线程在offer的时候多循环几次,但是总有一个线程可以把第二步成功,也就是tail最后会回到尾部的。
p == q的情况,即p = p.next 出现这种情况就是此时p已经被移除 -
poll方法
(q = p.next) == null的情况是p从链表中删除,此时重新循环链表
4 总结
相对于LinkedBlockingQueue, 它实现了无锁化的方式。因为cas+for这种方式的逻辑很难梳理。所以大致了解思路吧。