在日常中, 我们用到的数据结构有很多: 数组, 链表, 树等, 而在这些结构中, 还有一个叫做队列的存在。
和其他的集合相同, Java 原生提供了不同的实现。
而如果我们需要一个线程安全的队列的话, 可以基于实际的场景进行选择, 比如基于数组实现同时操作上会阻塞的 ArrayBlockingQueue, 基于链表同时也会阻塞的 LinkedBlockingDeque。
而今天我们聊的同样也是基于链表实现的线程安全的 ConcurrentLinkedQueue。
1 ConcurrentLinkedQueue 简单介绍
ConcurrentLinkedQueue 是 Java 中的一个并发队列实现, 位于 java.util.concurrent 包下。
它提供了一个基于链表实现的无界线程安全队列, 采用非阻塞算法实现并发操作。主要特点包括:
- 无界队列: ConcurrentLinkedQueue 不限制队列的大小, 可以动态地增长
- 非阻塞算法: 内部实现采用了非阻塞算法, 避免了传统锁的性能开销, 提高了并发性能
- 线程安全: 所有的操作都是线程安全的, 不需要额外的同步手段, 主用是通过 CAS(Compare and Swap)等无锁算法来实现的
- 高性能: 适用于高并发场景, 由于采用非阻塞算法, 避免了大部分锁的争用, 因而具有较好的性能表现
- FIFO 顺序: 队列遵循先进先出(FIFO)的原则, 保持元素插入的顺序
使用 ConcurrentLinkedQueue 可以方便地实现生产者 - 消费者模型, 适用于多线程环境下需要高效并发操作的场景。
需要注意的是, 由于该队列是无界的, 需要合理控制生产者的速度, 以防止队列无限制地增长。
2 存储数据的节点 Node
ConcurrentLinkedQueue 虽然叫做队列, 但是在内部的实现中是通过链表的方式实现的。
而说到链表就绕不开一个重要的属性: 链表的节点。
2.1 节点的属性定义
在 ConcurrentLinkedQueue 中的节点定义很简单
public class ConcurrentLinkedQueue<E> {
// 链表节点定义
private static class Node<E> {
/**
* 节点的数据
*/
volatile E item;
/**
* 下一个节点
*/
volatile Node<E> next;
}
}
这个节点的定义很简单, 就 2 个属性
- item: 存储在节点里面的数据
- next: 下一个节点
有些特殊的就是 item 和 next 都是用 volatile 修饰的, 保证了其可见性。
2.2 节点的方法
public class ConcurrentLinkedQueue<E> {
// 链表节点定义
private static class Node<E> {
/**
* CAS 更改 Node 中的数据域 item
* @param cmp 旧值
* @param val 新值
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* CAS 更改 Node 中的指针域 next, 也就是修改当前节点的下一个节点
* @param val 新的值
*/
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
/**
* 通过 CAS 将 Node 中的 next 指针从 cmp 设置为 val
* 涉及 cas 的旧值比较, 一样才会替换
* @param cmp 旧值
* @param val 新值
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
}
}
Node 本身就只提供了这 3 个方法, 这些方法实际上是通同 UNSAFE 的方法, 达到 CAS 式的修改值, 具备着原子性。
2.3 节点在 ConcurrentLinkedQueue 中的使用
public class ConcurrentLinkedQueue<E> {
/** 头结点 */
private transient volatile Node<E> head;
/** 尾结点 */
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
}
head, tail 2 个节点分别代表了链表的头和尾, 而 ConcurrentLinkedQueue 就是通过这 2 个节点对队列进行管理的。
通过无参的构造函数构造后, ConcurrentLinkedQueue 的结构如下:
一个很简单的链表。
3 ConcurrentLinkedQueue 的源码实现
3.1 ConcurrentLinkedQueue 的构造函数
ConcurrentLinkedQueue 的声明有 2 种 方式
第一种: 无参的构造函数
public class ConcurrentLinkedQueue<E> {
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
}
声明了一个值为空的节点, 并将头尾指针都指向这个节点, 结束。
第二种: 带 Collection 参数的构造方法
public class ConcurrentLinkedQueue<E> {
public ConcurrentLinkedQueue(Collection<? extends E> c) {
// 临时的头尾指针, 用来处理程序运行中的临时变量
Node<E> h = null, t = null;
// 遍历入参的集合
for (E e : c) {
/// 检查入参的数据是否为 null, 为 null 的话, 抛出异常
checkNotNull(e);
// 将数据封装为 Node 节点
Node<E> newNode = new Node<E>(e);
// 临时头节点为 null, 将当前节点设置为头节点和尾结点
if (h == null)
h = t = newNode;
else {
// 通过 CAS, 设置当前的尾结点节点的下一个节点 (next 属性) 为 newNode
t.lazySetNext(newNode);
// 更新当前的尾结点节点为新创建的节点, 也就是更新临时尾指针的位置
t = newNode;
}
}
// 头指针为空的话, 创建一个新的空值节点, 并将其赋给临时头尾指针
// 如果入参的集合为空, 就存在这种可能
if (h == null)
h = t = new Node<E>(null);
// 真正地更新头指针
head = h;
// 真正地更新尾指针
tail = t;
}
}
ConcurrentLinkedQueue 的构造方法很简单, 基本就是创建维护了一个链表, 自身持有了链表的头尾节点的引用。
3.2 ConcurrentLinkedQueue 新增数据 - offer() 方法
ConcurrentLinkedQueue 做为一个队列来说, 需要满足 FIFO 特性, 既插入元素总是在队列最末尾的地方进行插入, 而取 (移除) 元素总是从队列的队头。
那么可以从 offer(新增) 和 poll(取出) 2 个方法理解 ConcurrentLinkedQueue。
public class ConcurrentLinkedQueue<E> {
public boolean offer(E e) {
// 非空判断, 为空会抛出空指针异常
checkNotNull(e);
// 把数据封装为 Node 节点
final Node<E> newNode = new Node<E>(e);
// 死循环, 将封装好的 Node 节点放到当前队列的尾部, 直到成功, 返回 true
// p = t = tail = 尾结点
for (Node<E> t = tail, p = t;;) {
// q = p 的下一个节点 = 当前尾节点的下一个节点
Node<E> q = p.next;
// 通过 cas 将新增的节点设置在 p 的后面
// 因为尾结点 p 的后一个节点 q 为空, 所以将 p 的下一个节点设置为 newNode 成功后, 这时候 newNode 就是队列的尾部了
if (p.casNext(null, newNode)) {
// 当前节点设置在尾部成功了
// 当 p != t 时, 将创建的节点更新为尾部
// 什么时候会出现 p 不等于 t? 可以从放入第二个元素时开始考虑
if (p != t)
// 通过 cas 尝试将 tail 从 t 设置为 newNode
// 设置成功/失败都没关系, 即使失败了, 在后续的循环进行重新更新
casTail(t, newNode);
// 返回 true, 结束死循环
return true;
}
// 当前的 p 的 next 节点 q, 等于 q 自身,
// 这种情况一般会出现在有线程新增和有线程进行删除的情况, 我们可以在了解了 poll 后, 在回来看
else if (p == q)
// 尾结点变了 ? 取新的尾结点 : 头结点
p = (t != (t = tail)) ? t : head;
else
// p 不等于 t, 同时 tail 被别的线程修改了, 直接取修改后的尾结点 t, 没有的话, 取下一个节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
/**
* cas 操作, 将当前队列的 tail 从 cmp 设置为 val
*
* @param cmp 旧值
* @param val 新值
*/
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
}
offer 方法里面的 for 循环分析
注: 规定 ConcurrentLinkedQueue 是通过无参的构造方法声明的, 初始的节点为 Node-0
第一次往里面放数据时 (存放的值为 e)
- Node p = Node t = tail, tail 指向的是一个值为 null, next 为 null 的初始节点 Node-0
- Node q = p.next, 因为 tail.next = null, 所以 q = null
- 走进 if 里面的判断, 通过 cas 操作, 将 p.next 从 null 设置为 newNode 节点
- cas 设置成功了, 进入到里面的第二个 if, 这时候 p 还是等于 t 的, 所以直接返回 true
如图, 此时队列的尾节点应该为 Node-1, 而 tail 指向的节点依然还是 Node-0, 从上面的流程看下来, 也没有对 tail 指针重试设置指向尾部的操作 (实际 tail 的重新指向最新的尾结点的, 由下次的添加数据进行更新),
所以 ConcurrentLinkedQueue 内的 tail 并不一定都是指向尾指针, 具有延迟性。
第二次往里面放数据时 (存放的值为 f)
- Node p = Node t = tail, tail 指向的是一个值为 e, next 为 Node-1 的初始节点 Node-0
- Node q = p.next, 这时候因为 p = tail, tail.next = Node-1, 所以 q = Node-1
- 因为 p.next 不等于 null, p != q, 所以走到了 else 里面的逻辑, 这时候 p == t 的, 所以 p = q, 也就是 p = Node-1 了
- 回到循环体的第一步, Node q = p.next = Node-1.next = null, 所以 q = null
- 走进 if 里面的判断, 通过 cas 操作, 将 p.next 从设置为 newNode 节点
- 这时 p != t 了, 通过 cas 操作将 tail 从 t 设置到 newNode, 更新尾结点 tail
这时候的样子为
通过分析, 我们可以整理出 poll 的执行逻辑为
- 如果 tail 指向的节点的下一个节点 (next 域) 为 null 的话, 说明 tail 指向的节点即为队列真正的队尾节点, 因此可以通过 casNext 插入当前待插入的节点, 但此时 tail 并未变化
- 如果 tail 指向的节点的下一个节点 (next 域) 不为 null 的话, 说明 tail 指向的节点不是队列的真正队尾节点。通过q (Node q = p.next) 指针往前递进去找到队尾节点, 然后通过 casNext 插入当前待插入的节点, 并通过 casTail 方式更改 tail
不知道你有没有留意到 p = (p != t && t != (t = tail)) ? t : q;
这行代码呢?
按照我们上面一步一步的分析下去,p 不可能被赋值为 t 的。 那么我们需要从多线程的角度思考了。
从多线程分析 offer 方法
其实在多线程环境下这行代码很有意思的。t != (t = tail) 这个操作并非一个原子操作, 这个可以看成 2 个效果
- t = tail
- 判断赋值前的 t != tail
在循环中一开始 t = tail, 那么在多线程的情况下可能存在另一个线程将 tail 修改了, 导致了 t != tail。
至于为什么 t != (t = tail) 是旧的 t 值和修改后的 tail 进行比较,因为涉及到一点 JVM 的知识, 感兴趣的, 可以看一下下文的附录, 这里就不展开了。
3.3 ConcurrentLinkedQueue 删除数据 - poll() 方法
public class ConcurrentLinkedQueue<E> {
public E poll() {
// Java 标签写法
restartFromHead:
for (;;) {
// p = h = head = 头节点, q = null
// 从头部往后遍历
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
// 将 p 的值赋给 item, item 不为空同时通过 cas 将 p 的 item 属性从 item 设置为 null
if ((item = p.item) != null && p.casItem(item, null)) {
// 设置成功了 出现了 p 的节点 不等于 h 也就是 head 节点, 更新头节点
if (p != h)
// 更新最新的头部
// p 的下一个节点不为 null 吗 ? 是的话就取 p 的下一个节点,不是的话,还是取 p
updateHead(h, ((q = p.next) != null) ? q : p);
// 返回 item 值
return item;
}
// 没有其他的节点了
else if ((q = p.next) == null) {
// 把头节点从 h 设置为 p
updateHead(h, p);
return null;
}
// p q 是同一个节点, 回到循环头部, 重新开始
else if (p == q)
continue restartFromHead;
else
// 剩余的情况, p = q
p = q;
}
}
}
/**
* 尝试更新头结点
* @param h 头节点当前的值
* @param p 头节点将要被设置的值
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
}
一看到这段代码,第一眼应该是感觉到奇怪吧
restartFromHead:
Java 的一个标签语法支持,搭配 break, continue, 可以直接跳出循环, 类似于 goto 的特性。
ok,进入代码的分析
poll 方法里面的 for 循环分析
假设初始的链表的样子为
第一次往里面取数据时
- Node h = Node p = head = 队列的头结点
- 取到头结点的值 item
- item 不为空, 尝试通过 cas 将 p 也就是头结点的 item 属性设置为 null
- 更新成功了, 这时候 p == h, 直接返回 item 值
这时候 head 还是没有改变,还是指向了原来的头结点, 同时 head 节点的 item 属性等于 null, next 执行了下一个节点
第二次往里面取数据
- Node h = Node p = head = 队列的头结点
- 取到头结点的值 item
- 经过第一次后,这时候 item == null, 走下一个判断
- (q = p.next) 不为 null, 走下一个判断
- q = p.next, p != q, 走下一个判断
- 这时候走到了最后的 else, 也就是 p = q 也就是 p = p.next
- 又回到循环的头部, 次数 item = p.item
- item 不为空, 尝试通过 CAS 将 p 也就是头结点的 item 属性设置为 null
- 更新成功了, 这时候 p != h, 进入到判断内部
- 这时候的 p 的 item 为空了,p 的下一个节点不为空的话,将 p 的下一个节点设置为新的头结点,如果 p 的下一个节点为空的话, 也就是队列没有其他的节点了, 那么只能取 p 作为尾结点了
- 进入到 updateHead 方法, h != p, 同时将头节点从 h 也就是 head 设置为 p
- 更新头节点成功后,将原本旧的头节点 h 的 next 设置为自身,这种 next 指向自身的节点称之为哨兵节点, 一般表示为要删除的节点或者是空节点
- 返回 item 值,结束
按照上面的流程,循环体内的判断还有 1 个 else if 的逻辑是不会走进去的,那么什么情况下, 才会跨入这个判断
else if (p == q)
continue restartFromHead;
这个判断主要用于处理多个线程 poll 的情况, q = p.next 也就是说 q 永远指向的是 p 的下一个节点,那么什么情况下会使得 p, q 指向同一个节点呢?
根据上面我们的分析,只有 p 指向的节点在 poll 的时候转变成了哨兵节点 (通过 updateHead 方法中的 h.lazySetNext),
当线程 A 在判断 p==q 时,线程 B 已经将执行完 poll 方法将 p 指向的节点转换为哨兵节点并且 head 指向的节点已经发生了改变, 所以就需要从 restartFromHead 处执行,保证用到的是最新的 head。
4 部分线程执行 offer() 方法, 部分线程执行 poll() 方法
在分析 offer 方法的时候我们还留下了一个问题 if (p == q)
让 if 判断为 true 的情况为 p 指向的节点为哨兵节点,而什么时候会构造哨兵节点呢?
在对 poll 方法的讨论中,我们已经找到了答案,即当 head 指向的节点的 item 域为 null 时会寻找真正的队头节点,等到待插入的节点插入之后,会更新 head,并且将原来 head 指向的节点设置为哨兵节点。
模拟一下,大概是这样的过程:
第一步: 初始队列
这时,只要一个节点 Node-0, tail 和 head 都指向 Node-0
第二步: 线程 A 放入一个元素 e
这时,新增了一个节点 Node-1, tail 和 head 都还是指向 Node-0
第三步: 线程 B 取出一个节点
这时, head 指向了 Node-1, 但是 tail 还是指向了一个 next 指向自己的哨兵节点 Node-0
第四步:线程 A 继续放入一个元素 f
一开始 Node p = Node t = tail, Node q = p.next = q, 所以 p == q 成立了。这时候,tail 没变,所以 t != t 为 false, t 取值变为 head。
5 HOPS 设计
通过上面对 offer 和 poll 方法的分析,可以发现 tail 和 head 是延迟更新的,两者更新触发时机为:
tail 更新触发时机: 当 tail 指向的节点的下一个节点不为 null 的时候, 会执行定位队列真正的队尾节点的操作, 找到队尾节点后完成插入之后才会通过 casTail 进行 tail 更新。
当 tail 指向的节点的下一个节点为 null 的时候,只插入节点不更新 tail
head 更新触发时机: 当 head 指向的节点的 item 域为 null 的时候,会执行定位队列真正的队头节点的操作, 找到队头节点后完成删除之后才会通过 updateHead 进行 head 更新,
当 head 指向的节点的 item 域不为 null 的时候,只删除节点不更新 head
并且在更新操作时,源码中会有注释为hop two nodes at a time, 所以这种延迟更新的策略就被叫做 HOPS 的大概原因是这个。从上面更新时的状态图可以看出,
head 和 tail 的更新是 “跳着的”, 即中间总是间隔了一个。那么这样设计的意图是什么呢?
如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点: 如果大量的入队操作,每次都要执行 cas 进行 tail 的更新,汇总起来对性能也会是大大的损耗。
如果能减少 cas 更新的操作,无疑可以大大提升入队的操作效率,每间隔 1 次 (tail 和队尾节点的距离为 1) 才利用 cas 更新 tail。 对 head 的更新也是同样的道理。
虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。
6 附录
在上面的 offer() 方法中有一段代码 (p != t && t != (t = tail))
, 有说到 t != (t = tail)
, 中实际的效果是 先把当前的 tail 赋值给 t, 然后用赋值前的 t 判断是否等于 tail。
这设计到 JVM 的操作数栈和局部变量表 的部分知识。
可以简单的先理解当前有一个栈和一堆人为定义好的操作指令。这些指令会将各种数据往这个栈里面扔数据, 获取数据等。
还有一个局部变量表, 可以简单的看出一个 HashMap, 方法中声明的变量, 变量的值都存在这个 HashMap 中。
几个等一下需要用到的指令:
直接指令 | 说明 |
---|---|
bipush | 将单字节的常量值(-128 ~ 127)推送至栈顶 |
istore_{n} | 将栈顶的整数弹出,并且赋值给局部变量表中的 index 为 n 的元素 |
iload_{n} | 局部变量表中的 index 为 n 的元素放到栈顶 |
if_icmpeq | 比较栈顶两 int 型数值大小,当结果等于 0 时跳转 |
dup | 复制栈顶数值并将复制值压入栈顶 |
iconst_{0} | 将 int 型 n 推送至栈顶 |
看一下, 下面的例子
先看一下 p, t 不相同的情况。
public class Main {
public static void main(String[] args) {
int t = 8;
int p = t;
int tail = 9;
// 因为 p = t = 8
// 所以第一步 p != t 就是 false, 后面的逻辑可以省略, 不进行比较了
boolean result = (p != t && t != (t = tail));
System.out.println("p=" + p + ", t=" + t + ", result=" + result);
}
}
上面的代码修改为字节码的形式如下:
public class Main {
public static void main(String[] args) {
// 在栈顶 放入 8
0: bipush 8
// 将栈顶(8)弹出,放到局部变量表第二个的位置(t)
2: istore_1
// 将局部变量表第二个的位置(t)的值(8),压到栈顶
3: iload_1
// 将栈顶(8)弹出,放到局部变量表第三个的位置(p)
4: istore_2
// 在栈顶 放入 9
5: bipush 9
// 将栈顶(9)弹出,放到局部变量表第四个的位置(tail)
7: istore_3
// 将局部变量表第三个的位置(p)的值,压到栈顶
8: iload_2
// 将局部变量表第二个的位置(t)的值,压到栈顶
9: iload_1
// p = 8, t = 8 2个的比较结果等于 false(0) , 跳转到 24
10: if_icmpeq 24
13: iload_1
14: iload_3
15: dup
16: istore_1
17: if_icmpeq 24
20: iconst_1
21: goto 25
// 在栈顶压入 0 这里的 0 可以看为 false
24: iconst_0
// 将栈顶(0)弹出,放到局部变量表第五个的位置(result)
25: istore 4
// 输出
27: getstatic #7 // Field java/lang/System.out:Ljava/io/PrintStream;
30: iload_2
31: iload_1
32: iload 4
34: invokedynamic #13, 0 // InvokeDynamic #0:makeConcatWithConstants:(IIZ)Ljava/lang/String;
39: invokevirtual #17 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
42: return
}
}
再看一下 p, t 相同的情况,这时候的代码如下:
public class Main {
public static void main(String[] args) {
int t = 8;
int p = t;
int tail = 9;
// 因为 p = t = 8
// 所以第一步 p == t 就是 true, 进行后面的比较了
boolean result = (p == t && t != (t = tail));
System.out.println("p=" + p + ", t=" + t + ", result=" + result);
}
}
上面的代码修改为字节码的形式如下:
public class Main {
public static void main(String[] args) {
0: bipush 8
2: istore_1
3: iload_1
4: istore_2
5: bipush 9
7: istore_3
8: iload_2
9: iload_1
// 上面的流程差不多,省略
// 比较 p == t 结果为true(1) 不跳转,走下一步
10: if_icmpne 24
// 将局部变量表第二个的位置(t)的值,压到栈顶
13: iload_1
// 将局部变量表第四个的位置(tail)的值,压到栈顶
14: iload_3
// 复制栈顶数值并将复制值压入栈顶。即复制 tail 变量值并压入栈顶(9)
15: dup
// 将栈顶(9)弹出, 将栈顶数值存入局部变量表第二个的位置(t), 此时 t = 9
16: istore_1
// 比较栈顶两 int 型数值大小,也就是赋值前的 t 和 tail, 此时 8 != 9 结果为 true(1) 所以不会跳转到24行,继续执行下一行
17: if_icmpeq 24
// 将 int 型 1 压入栈顶
20: iconst_1
// 无条件跳转到 25 行
21: goto 25
24: iconst_0
// 将栈顶 1 存入局部变量表第五个的位置,同时出栈 result = 1
25: istore 4
// 内容输出
27: getstatic #7 // Field java/lang/System.out:Ljava/io/PrintStream;
30: iload_2
31: iload_1
32: iload 4
34: invokedynamic #13, 0 // InvokeDynamic #0:makeConcatWithConstants:(IIZ)Ljava/lang/String;
39: invokevirtual #17 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
42: return
}
}
通过上面字节码的逐步分析,我们可以得出 t != (t = tail)
实际的效果为
- t = tail
- 判断未改变值前的 t != tail
一旦有另一个线程将 tail 修改了,就会出现 t != tail 为 true
7 参考
ConcurrentLinkedQueue简介
Java并发容器–ConcurrentLinkedQueue