概叙
**Queue接口与List、Set同一级别,都是继承了Collection接口**。队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素,队列以一种先进先出的方式管理数据。
队列分为两种,阻塞队列和非阻塞队列。阻塞队列是如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致消费者线程或者生产者线程阻塞,非阻塞队列是能即时返回结果(消费者),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。这两种都是线程安全的。
1.阻塞队列BlockingQueue ArrayBlockingQueue,DelayQueue,LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue
2.非阻塞队列 ConcurrentLinkedDeque,ConcurrentLinkedQueue
理论上非阻塞队列更高效,但是实际应用中阻塞队列已经可以应付大多数的并发了。
并编程中,一般需要用到安全的队列,如果要自己实现安全队列,可以使用2种方式:
- 加锁,这种实现方式就是我们常说的阻塞队列。
- 使用循环CAS算法实现,这种方式实现队列称之为非阻塞队列。
加锁队列的实现较为简单,这里就略过,我们来重点来解读一下非阻塞队列。 从点到面, 下面我们来看下非阻塞队列经典实现类:ConcurrentLinkedQueue (JDK1.8版)
看下ConcurrentLinkedQueue的结构图
从内图可以了解ConcurrentLinkedQueue一个大概,ConcurrentLinkedQueue内部持有2个节点:head头结点,负责出列, tail尾节点,负责入列。
而元素节点Node,使用item存储入列元素,next指向下一个元素节点。
1 2 3 4 5 |
|
1 2 3 4 5 6 |
|
ConcurrentLinkedQueue使用特点
- 不允许null入列
- 在入队的最后一个元素的next为null
- 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到
- 删除节点是将item设置为null, 队列迭代时跳过item为null节点
- head节点跟tail不一定指向头节点或尾节点,可能存在滞后性
之所以有这奇葩约定,全因ConcurrentLinkedQueue是并发非阻塞队列决定的。 我们从源码上看一下ConcurrentLinkedQueue实现过程
ConcurrentLinkedQueue源码详解
入列offer
我们印象中链表特点:tail节点表示最后一个节点, head表示第一个节点。ConcurrentLinkedQueue 跟传统的链表有点区别,在单线程环境下符合传统链表特点,但涉及到多线程环境,ConcurrentLinkedQueue 中的tail节点不一定是最后一个节点,他可能是倒数第二个。所以ConcurrentLinkedQueue判断队尾条件是节点的next为null。
public boolean offer(E e) {
checkNotNull(e); //为空判断,e为null是抛异常
final Node<E> newNode = new Node<E>(e); //将e包装成newNode
for (Node<E> t = tail, p = t;;) { //循环cas,直至加入成功
//t = p = tail
Node<E> q = p.next;
if (q == null) { //判断p是否为尾节点
//如果是,p.next = newNode
if (p.casNext(null, newNode)) {
//首次添加时,p 等于t,不进行尾节点更新,所以所尾节点存在滞后性
//并发环境,可能存添加/删除,tail就更难保证正确指向最后节点。
if (p != t)
//更新尾节点为最新元素
casTail(t, newNode);
return true;
}
}
else if (p == q)
//当tail不执行最后节点时,如果执行出列操作,很有可能将tail也给移除了
//此时需要对tail节点进行复位,复位到head节点
p = (t != (t = tail)) ? t : head;
else
//推动tail尾节点往队尾移动
p = (p != t && t != (t = tail)) ? t : q;
}
}
分析
1、初始化
2、添加A元素
3、添加B元素
4、添加C
从图上看tail不一定执行最后一个节点,但可以确定最后节点的next节点为null。
到这可能朋友问他,并发环境什么情况都有可能,ConcurrentLinkedQueue是怎么保证线程安全的? 我们观察offer方法的设计,
1:是一个死循环,就是不停使用cas判断直到添加元素入队成功。
1 |
|
2:2个cas判断方法 p.casNext(null, newNode) 确保队列在入列时是原子操作
1 2 3 |
|
casTail(t, newNode); 确保tail队尾在移动改变时是原子操作
1 2 3 |
|
而在并发环境,ConcurrentLinkedQueue入列线程安全考虑具体可分2类:
1>线程1线程2同时入列 这个好理解, 线程1,线程2不管在offer哪个位置开始并发,他们最终的目的都是入列,也即都需要执行casNext方法, 我们只需要确保所有线程都有机会执行casNext方法,并且保证casNext方法是原子操作即可。casNext失败的线程,可以进入下一轮循环,人品好的话就可以入列,衰的话继续循环
2>线程1遍历,线程2入列 ConcurrentLinkedQueue 遍历是线程不安全的, 线程1遍历,线程2很有可能进行入列出列操作, 所以ConcurrentLinkedQueue 的size是变化。换句话说,要想安全遍历ConcurrentLinkedQueue 队列,必须额外加锁。
但换一个角度想, ConcurrentLinkedQueue 的设计初衷非阻塞队列,我们更多关注入列与出列线程安全,这2点能保证就可以啦。
出列poll
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
//入列折腾的tail,那出列折腾的就是head
E item = p.item;
//出列判断依据是节点的item=null
//item != null, 并且能将操作节点的item设置null, 表示出列成功
if (item != null && p.casItem(item, null)) {
if (p != h)
//一旦出列成功需要对head进行移动
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
//第一轮操作失败,下一轮继续,调回到循环前
continue restartFromHead;
else
//推动head节点移动
p = q;
}
}
}
看图, 被移动的节点(item为null的节点)会被jvm回收。
但是有个问题, tail也被回收, 那ConcurrentLinkedQueue就没有tail节点了。
如果此时再添加一个D元素时,会出现什么情况?
好问的朋友,又想了,ConcurrentLinkedQueue怎么保证出列线程安全?道理跟之前入列一样,cas保证原子操作即可。
ConcurrentLinkedQueue使用示例
阻塞队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
而非阻塞的实现方式则可以使用循环CAS的方式来实现。因为采用CAS操作,允许多个线程并发执行,并且不会因为加锁而阻塞线程,使得并发性能更好。
示例:
package yxxy.queuue;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.sun.corba.se.impl.protocol.JIDLLocalCRDImpl;
public class TEST1 {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
public static void main(String[] args) throws InterruptedException {
Thread[] ths = new Thread[6];
for(int i=1;i<ths.length;i++){
Runnable task = new Runnable() {
@Override
public void run() {
try {
while (!queue.isEmpty()) {
int i = queue.poll();
}
} catch (Exception e) {
System.out.println();
}
}
};
ths[i] = new Thread(task);
}
Runnable task = new Runnable() {
@Override
public void run() {
for(int i=0;i<99999;i++){
try {
queue.offer(i);
} catch (Exception e) {
}
}
}
};
ths[0] = new Thread(task);
long timeStart = System.currentTimeMillis();
runAndComputeTime(ths);
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
}
static void runAndComputeTime(Thread[] ths) {
Arrays.asList(ths).forEach(t->t.start());
Arrays.asList(ths).forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
当消费者速度大于生产者并且生产者速度不变的情况下,处理瓶颈就处在了消费者线程分别取队列里元素的速度上,所以此时使用不加锁的方式来取队列的元素会大大提高处理速度。