1、SynchronousQueue介绍
SynchronousQueue与前边的其他几个阻塞队列的差异是挺大的,在一般逻辑中队列是一个用
来存储数据的中间容器(前边几个阻塞队列也是用来存放数据的),但SynchronousQueue
却不是用来存放数据的,它是把生产者和消费者放到当前的SynchronousQueue队列中去配对
,也可以认为 SynchronousQueue不存储数据,只是存储生产者和消费者;
SynchronousQueue 特点:
1)当存储一个生产者到 SynchronousQueue 后,此时若没有消费者去跟他配对,生产者
会阻塞,什么方式阻塞要看存储生产者时调用的方法,如:
put(E e) :生产者在放到SynchronousQueue同时,如果有消费者在等待配对,
则直接配对;如果没有消费者在等待配对,生产者会一直阻塞等待
add(E e) :生产者在放到SynchronousQueue同时,如果有消费者在等待配对,
则直接配对,否则抛出异常,退出
offer(E e):生产者在放到SynchronousQueue同时,如果有消费者在等待配对,
则直接配对;如果没有消费者在等待配对,则直接返回false,退出
offer(E e,long timeout,TimeUnit unit) :生产者在放到SynchronousQueue同时
如果有消费者在等待配对,则直接匹配;如果没有消费者在等待配对
,则阻塞等待一段时间(timeout),若等待超过了超时时间还没有
消费者来配对,则返回false;若在生产者等待过程中被中断,则直接
抛出异常,退出。
2)生产者最终会有几种结果:
(1)如果在生产者阻塞期间有消费者来匹配,生产者就会将绑定的消息交给消费者
(2)如果生产者阻塞结束,或者生产者不允许阻塞,则生产者就直接失败
(3)如果生产者在阻塞期间,生产者线程被中断,则直接退出失败
3)同理,消费者和生产者的效果是一样的,消费者的 poll()、poll(timeout,unit)、take() 、remove() 等方法原理跟上边生产者方法差不多,区别是意思反过来,如下:
remove(): 消费者放到 SynchronousQueue 时,若有生产者在等待配对,则立即
配对,都则抛出异常退出
take():消费者放到 SynchronousQueue 时,若有生产者在等待配对,则立即
配对,否则一直等待直到有生产者来配对
poll():消费者放到 SynchronousQueue 时,若有生产者在等待配对,则立即
配对;否则返回null,直接退出
poll(timeout,unit):消费者放到 SynchronousQueue 时,若有生产者在等待配
对,则直接配对;否则消费者挂起等待,直到等待超过超时时间后还没
有生产者来配置,则返回null,退出;若在阻塞等待过程中,消费者被
中断,则直接抛出异常,退出
4)生产者和消费者的数据时配对后直接传递的,不会经过SynchronousQueue,
SynchronousQueue 不存储数据
2、SynchronousQueue核心属性
2.1、Transferer
进入 SynchronousQueue 类内部发现,SynchronousQueue提供了一个抽象内部
类 Transferer,在内部类 Transferer 中定义了方法 transfer;方法 transfer 结构如下:
注意:
根据角色的不同,生产者与消费者调用方法 transfer 时是有点区别的,即:
生产者在调用 transfer 方法时,第一个参数e 会正常传递数据(生产者绑定的数 据)
消费者在调用 transfer 方法时,第一个参数 e 会传递null值
内部类 Transferer 有2个子类,在子类中分别实现了方法transfer ,子类分别是:
SynchronousQueue.TransferQueue 基于队列实现
SynchronousQueue.TransferStack 基于栈实现
注意:在实例化 SynchronousQueue 会指定使用哪个Transferer 子类,如下图所示:
由构造方法可以发现 SynchronousQueue 的核心属性只有 transferfer,用于
接收Transferer 的实现类对象。
3、SynchronousQueue 使用示例
SynchronousQueue 使用跟前边几个阻塞队列在使用上是一样的,常用的方法还是接口
BlockingQueue 中定义的那几个方法,只是方法的内部含义是不同的。
SynchronousQueue 使用示例如下:
示例一:
public class SynchronousQueueDemo01 {
public static void main(String[] args) throws InterruptedException {
//因为 SynchronousQueue 不存放数据,所以没有长度的概念
SynchronousQueue queue = new SynchronousQueue();
String msg = "SynchronousQueue hello";
//模拟生产者和消费者
//这种场景可能生产者和消费者没匹配成功
new Thread(()->{
boolean b = queue.offer(msg);
System.out.println(b);
},"t1-que").start();
new Thread(() -> {
//消费者
String m = (String) queue.poll();
System.out.println(m);
},"t2-que").start();
//生产者阻塞,等待消费者来匹配
new Thread(()->{
boolean b = false;
try {
//生产者阻塞3s
b = queue.offer(msg,3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+b);
},"t3-que").start();
Thread.sleep(300);
new Thread(() -> {
//消费者
String m = (String) queue.poll();
System.out.println(m);
},"t4-que").start();
//消费者阻塞,等待生产者来匹配
new Thread(() -> {
//消费者
String m = null;
try {
//消费者阻塞3s
m = (String) queue.poll(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(m);
},"t5-que").start();
Thread.sleep(300);
new Thread(()->{
boolean b = false;
//生产者阻塞3s
b = queue.offer(msg);
System.out.println(Thread.currentThread().getName()+" "+b);
},"t6-que").start();
示例二:
public class SynchronousQueueDemo02 {
public static void main(String[] args) throws InterruptedException {
/**
* 测试采用哪个 Transfer 的子类来存储数据
* SynchronousQueue.TransferQueue
* SynchronousQueue.TransferStack
*/
//SynchronousQueue queue = new SynchronousQueue();//默认false,不公平
SynchronousQueue queue = new SynchronousQueue(true);//true 公平
//3个生产者和3个消费者
new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(300);
new Thread(() -> {
try {
System.out.println("消费者 1: "+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("消费者 2: "+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("消费者 3:"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
4、SynchronousQueue的TransferQueue源码解析
4.1、QNode
QNode 是TransferQueue 的内部类,表示 TransferQueue 中每个节点信息;
TransferQueue中没有使用锁,QNode 通过 volatile+CAS来保证操作数据的线程安全
QNode 结构如下:
static final class QNode {
volatile QNode next; // next node in queue 当前节点的下一个节点
//节点绑定的数据
//若是生产者,item 是生产者输入的数据,若是消费者 item 为null
volatile Object item;
//节点绑定的线程,即当前线程(生产者或消费者线程),线程处于 “挂起/唤醒”状态
volatile Thread waiter;
/**
* 是否是数据节点
* 该节点用于区分是生产者线程,还是消费者线程,基于 item属性是否为null来区分
* todo 注意:
* 最终生产者要将 item 的数据交给消费者,
* 最终消费者线程要获取生产者的item值
*/
final boolean isData; //
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//+++++++++++++++ 下边省略 CAS 操作 ++++++++++++++
}
4.2、TransferQueue 核心属性和构造函数
TransferQueue 本质上还是一个队列
transient volatile QNode head; //头节点
/** Tail of queue */
transient volatile QNode tail;//尾节点
/**
* 引用一个已取消的节点,该节点可能尚未从队列中取消链接,因为它是取消时最后一个插入的节点。
*/
transient volatile QNode cleanMe;
TransferQueue() {
//初始化时创建一个伪节点,作为头节点
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
}
4.3、transfer(E e, boolean timed, long nanos)
transfer 是 TransferQueue 最核心的方法,其是真正实现生产者 或 消费者匹配的方法。
若生产者调用 transfer,返回值是生产者自身的数据e;
若消费者调用 transfer,返回值是 与生产者匹配后,从生产者获取的数据。
transfer 方法如下:
/**
* 核心方法
* 生产者 参数 e 是生产者发送的数据,生产者调用该方法返回的是他自身的数据
* 消费者 参数 e 为null,但消费者调用该方法会返回与生产者交换后的数据(即生产者的数据)
* timed=false 表示无限阻塞等待,timed=true 表示阻塞时间是第三个参数 nanos
*/
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
//当前 QNode 需要封装当前 生产者/消费者 的信息
QNode s = null; // constructed/reused as needed
//判断是否是数据节点,true=生产者 false=消费者
boolean isData = (e != null);
//自旋
for (;;) {
//获取头节点和尾节点
QNode t = tail;
QNode h = head;
/**
* 若头节点和尾节点有一个为null,表示TransferQueue初始化可能被指令重排序了(此时 TransferQueue还没初始化完成),
* 此时则直接进行下一次判断,
* CPU指令重排后,但最终 TransferQueue 实例化结束后数据是正常的
*/
if (t == null || h == null) // saw uninitialized value
continue; // spin
/**
* h == t 表示当前 TransferQueue 队列既没有生产者也没有消费者,即队列为空,则直接将当前节点入队列;
* 或者 若 h != t,但当前节点与队尾节点是同一种角色,则当前节点也是直接入队列
* t.isData == isData 表示对尾节点与当前节点是同一角色
*/
if (h == t || t.isData == isData) { // empty or same-mode
//拿到尾节点的next
QNode tn = t.next;
//t != tail 表示并发下有其他线程修改了尾节点,则直接进入下一次循环
if (t != tail) // inconsistent read
continue;
//tn != null 说明并发下有其他相同角色线程在我(当前线程)之前入队列了,即尾节点被修改了,但此时可能尾节点还没有更新,
//所以在这里可以调用 advanceTail 帮助前边的线程修改尾节点tail的指向,然后进入下一次循环
if (tn != null) { // lagging tail
//帮助前边的线程修改尾节点tail的指向
advanceTail(t, tn);
continue;
}
//todo 执行到这里表示尾节点没有被其他线程修改,当前节点可以尝试入队列
//判断当前线程是否可以阻塞,timed && nanos <= 0 表示没必要再入队列,直接结束
if (timed && nanos <= 0) // can't wait
return null;
//将当前线程信息包装成 QNode
if (s == null)
s = new QNode(e, isData);
//基于CAS,将tail节点的next指向修改为当前线程的节点,若修改失败则进入下一次循环再次尝试修改
if (!t.casNext(null, s)) // failed to link in
continue;
//基于CAS替换tail的指向
advanceTail(t, s); // swing tail and wait
/**
* 执行到这里,表示当前线程节点已经入队列了
* awaitFulfill 挂起线程,等待匹配 生产者或消费者
* x 是返回替换后的数据
*/
Object x = awaitFulfill(s, e, timed, nanos);
//x == s 表示元素和节点一致,说明节点取消了,则清空节点,并直接结束
if (x == s) { // wait was cancelled
//清空当前节点,将上一个节点的next指向当前节点的next,并将当前节点的next设置为null
clean(t, s);
return null;
}
//isOffList() 判断当前节点是否还在队列中
//若不在则将当前节点设置为head
if (!s.isOffList()) { // not already unlinked
//将当前节点设置为head
advanceHead(t, s); // unlink if head
//x != null 表示当前节点是消费者
if (x != null) // and forget fields
//将当前节点的item设置为自己,表示当前节点没有意义了
s.item = s;
//匹配成功,将线程设置为null
s.waiter = null;
}
//返回数据
return (x != null) ? (E)x : e;
} else { // complementary-mode
/**
* 这里匹配队列中角色
*/
//拿到head 的next,作为匹配节点
QNode m = h.next; // node to fulfill
//并发判断,若 尾节点、头结点的next节点或头节点发生了改变,则直接进行下一次for循环
if (t != tail || m == null || h != head)
continue; // inconsistent read
//获取匹配节点的节点数据
Object x = m.item;
/**
* isData == (x != null) 判断匹配节点与当前线程是否是统一角色
* x == m :节点是可以被取消的,当节点取消后就会将当前 QNode节点的item属性的值设置为当前QNode节点自己,
* 所以当 x == m 表示节点已经取消
* m.casItem(x, e):基于CAS尝试交换数据匹配节点与当前线程节点的数据,(即交换数据),交换成功表示匹配成功,
* 否则表示匹配失败;
* 数据交换失败,表示有并发问题,则重新设置head节点,并进入下一次循环
*
*/
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
//重新设置head节点
advanceHead(h, m); // dequeue and retry
continue;
}
//todo 执行到这里,表示匹配成功
//重新设置head节点
advanceHead(h, m); // successfully fulfilled
//唤醒匹配节点的线程
LockSupport.unpark(m.waiter);
//x != null 表示队列中的head.next是生产者,当前线程是消费者,消费者返回交换后的数据 e
//反之 表示队列中的head.next是消费者,当前线程是生产者,生产者返回交换后的数据 null
return (x != null) ? (E)x : e;
}
}
}
5、SynchronousQueue的TransferStack源码解析
5.1、SNode
SNode 是 TransferStack 的内部类,该类用来表示 TransferStack 栈中的每一个元素。
TransferStack中没有使用锁,SNode 通过 volatile+CAS来保证操作数据的线程安全;
SNode 结构如下:
static final class SNode {
//栈中下一个节点
volatile SNode next;
//与当前节点匹配的节点
volatile SNode match;
//当前节点绑定的线程
volatile Thread waiter;
//节点数据,若节点是消费者,则该属性为null
Object item;
//节点模式:REQUEST、DATA、FULFILLING
int mode;
SNode(Object item) {
this.item = item;
}
//基于CAS修改节点的next
boolean casNext(SNode cmp, SNode val) {
return cmp == next && //表示当前节点没有被其他线程修改
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 尝试匹配当前调用者节点this与节点s
*/
boolean tryMatch(SNode s) {
if (match == null && //多线程下,当前节点没有匹配其他节点
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {//基于CAS尝试匹配
//若匹配成功则唤醒该节点绑定的线程
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
//判断当前节点是否已经与节点s匹配完成了
return match == s;
}
}
5.2、TransferStack核心属性
TransferStack 是基于栈来实现的, 先入栈的生产者或消费者,先进的后匹配,最后
入栈的先匹配,所以这种方式称为 “非公平”
TransferStack 核心属性如下:
/**
* TransferStack 是一个栈结构,先入栈的生产者或消费者,先进的后匹配,最后入栈的先匹配,
* 所以这种称为不公平
*/
/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {
//SNode的状态(模式),或在节点字段中组合在一起
static final int REQUEST = 0;//表示未被匹配的消费者
/** Node represents an unfulfilled producer */
static final int DATA = 1; //未被匹配的生产者
static final int FULFILLING = 2; //表示节点正在与其他节点匹配,
//栈顶元素
volatile SNode head;
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
}
5.3、transfer(E e, boolean timed, long nanos)
transfer 是 TransferStack 的核心方法,该方法功能是把当前线程数据封装成一个SNode节
点,然后放入栈中;最后与其后边(即next)节点进行匹配,若匹配成功,则将当前节点与
其next节点都从栈中移除;若匹配失败,则阻塞;
transfer 方法代码如下:
/**
* todo 核心方法
* 新增数据到栈中
* 先把当前节点放入栈中,然后与其next节点进行匹配,若匹配成功,则修改栈顶元素;
* 若匹配失败,则 当前节点与其next节点都从栈中移除
*
* 参数:
* e: 若调用者是生产者,则e不为null,e是生产者准备发送的数据
* 若调用者是消费者,则e 等于null
* timed: true=表示带超时时间,false=表示不带超时时间
* nanos:超时时间
*
* 返回值:
* 若调用者是生产者,返回值是他自身的数据e
* 若调用者是消费者,返回值是与其匹配的生产者的数据
*
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//判断当前是消费者还是生产者
int mode = (e == null) ? REQUEST : DATA;
//自旋
for (;;) {
SNode h = head;
/**
* h == null:表示栈为空,则直接入栈
* h.mode == mode :表示当前数据与栈顶数据是同一角色,同一角色不能匹配,则直接入栈
*/
if (h == null || h.mode == mode) { // empty or same-mode
//若当前数据是带超时时间阻塞,但超时时间小于等于0,表示当前数据已经是不能等待(即数据已经是无意义的,也不能入栈,)
//但还是要看下当前栈顶数据是否被其他线程取消(若被其他线程匹配成功后,会从栈中取消),若被取消,则修改栈顶
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
//更新栈顶元素
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//先把当前线程包装成SNode节点对象,然后
//调用 casHead 将节点s放入栈中
//阻塞,等待生产者或消费者来匹配
//todo 注意:若节点被取消,这里返回的是当前节点自己
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // 表示节点被取消,则从栈中清除节点s,然后返回null
clean(s);
return null;
}
/**
* todo 注意:执行到这里节点s已经匹配成功;
* 但此时头节点不为null且 h.next等于节点s ,这说明
* 在节点s匹配过程中,有与s同角色其他线程进来了并作为头结点h,且节点h与s匹配成功
* ,匹配成功后需要把s和h都从栈中移除
*/
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
/**
* 匹配成功,若当前节点是消费者,则返回与其匹配者的数据(即与其匹配的生产者的数据),
* 若当前节点是生产者,则返回自身的数据
*/
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill 栈中有数据,但栈顶元素不是处于正在匹配中
//如果栈顶元素已经取消,则更新栈顶
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//将当前线程包装成SNode,并放入栈
//自旋
for (;;) { // loop until matched or waiters disappear
/**
* 当前节点s入栈后,s就是栈顶元素
* s.next == null 表示s是取消状态,此时需要清空栈,
*/
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
/**
* 尝试把m与s匹配,若匹配成功,则m与s都需要从栈中移除
*/
SNode mn = m.next;
if (m.tryMatch(s)) {
//匹配成功后将 m、s从栈中移除
casHead(s, mn); // pop both s and m
/**
* 匹配成功,若当前节点是消费者,则返回与其匹配者的数据(即与其匹配的生产者的数据),
* 若当前节点是生产者,则返回自身的数据
*/
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
/**
* m与s匹配失败,则将s的next 修改成mn,进行下一次匹配
*/
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
//此时栈中只有h一个元素,h已经无法匹配,则清空栈
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
/**
* 尝试h与m匹配,若匹配成功,则修改栈顶元素,若匹配失败,则修改h.next
*/
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}