目录
一、简介
二、基本原理
三、jdk8
内部属性
4个常量值
transfer
tryAppend
take()
awaitMatch
boolean remove(Object o)
四、jdk17
主要参数
put/offer
take()
remove()
五、与synchronousqueue 区别
六、知识小结
一、简介
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,
LinkedTransferQueue多了tryTransfer和transfer方法。可以算是 LinkedBolckingQueue 和
SynchronousQueue 的合体。LinkedTransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部节点分
为数据结点、请求结点;基于CAS无锁算法实现
二、基本原理
LinkedTransferQueue 消费者线程获取取数据时:调用take poll 等方法
如果队列不为空,则直接取走数据,若队列为空则消费者线程会生成一个占位虚拟节点(节点元素为null)入队,
并等待在这个节点上,后面生产者线程请求添加数据时,会从单向链表的head节点开始遍历,如果发现某个节点
是一个取数请求任务类型的节点(即是这个节点的isData为false,item == null),生产者线程就不入队了,直
接就将元素填充到该节点(元素传递给它),并唤醒该节点等待的消费者线程,被唤醒的消费者线程取走元素 ;
LinkedTransferQueue 生产者线程传递数据时:调用transfer方法
- 当有消费者线程阻塞等待时,调用transfer方法的生产者线程不会将元素存入队列,而是直接将元素传递给消费者,并唤醒阻塞的线程;
- 如果调用transfer方法的生产者线程发现没有正在等待的消费者线程,则这个生产者请求创建一个节点,这个节点将会被添加到当前链表的末尾将数据入队,然后会阻塞等待,直到有一个消费者线程来获取该元素。
LinkedTransferQueue内部链表上的有效节点,要么全部都是由取数请求创建的节点,其isData为false,item属性为null;要么就全部
都是由存储请求创建的节点,其isData为true,item属性不为null ,只需要由head开始找到第一个有效节点判定是否可以存储/添加
数据,因为只要存在生产者或者消费者在队列时,对应的消费者或者生产者就不会入队列,也就是说二者只有一个会在队列,如果生产者
在队列,消费者来取数据就会唤醒它,反之消费者在队列,生产者也会唤醒消费者线程
该类实现了一个 TransferQueue
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果可能,立即将元素转移给等待的消费者。
// 如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。
boolean tryTransfer(E e);
// 将元素转移给消费者,如果需要的话等待。
// 如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。
void transfer(E e) throws InterruptedException;
// 上面方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 如果至少有一位消费者在等待,则返回 true
boolean hasWaitingConsumer();
// 返回等待消费者人数的估计值
int getWaitingConsumerCount();
}
tryTransfer(E e)
当生产者线程调用tryTransfer方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和transfer方法的区别就是
tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法必须等到消费者消费后才返回。
tryTransfer(E e, long timeout, TimeUnit unit)
加上了限时等待功能,如果没有消费者消费该元素,则等待指定的时间再返回;如果超时还没消费元素,则返回false,如果在超时时间内
消费了元素,则返回true。
三、jdk8
内部节点node
static final class Node {
// 如果是消费者请求的节点,则isData为false,否则该节点为生产(数据)节点为true
final boolean isData; // false if this is a request node
// 数据节点的值,若是消费者节点,则item为null
volatile Object item; // initially non-null if isData; CASed to match
// 指向下一个节点
volatile Node next;
// 等待线程
volatile Thread waiter; // null until waiting
// CAS设置next
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// CAS设置item
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 构造方法
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
// 将next指向自己
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
// 匹配失败或者节点被取消的时候会调用,设置item自连接,waiter为null
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
// 节点是否被匹配过了
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
// 是否是一个未匹配的请求节点
// 如果是的话,则isData为false,且item为null,因为如果被匹配过了,item就不再为null,而是指向自己
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
// 如果给定节点不能连接在当前节点后则返回true
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
// 匹配一个数据节点
final boolean tryMatchData() {
// assert isData;
Object x = item;
//将数据节点item修改为null
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}
static final class Node {
// 如果是消费者请求的节点,则isData为false,否则该节点为生产(数据)节点为true
final boolean isData; // false if this is a request node
// 数据节点的值,若是消费者节点,则item为null
volatile Object item; // initially non-null if isData; CASed to match
// 指向下一个节点
volatile Node next;
// 等待线程
volatile Thread waiter; // null until waiting
// CAS设置next
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// CAS设置item
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 构造方法
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
// 将next指向自己
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
// 匹配失败或者节点被取消的时候会调用,设置item自连接,waiter为null
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
// 节点是否被匹配过了
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
// 是否是一个未匹配的请求节点
// 如果是的话,则isData为false,且item为null,因为如果被匹配过了,item就不再为null,而是指向自己
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
// 如果给定节点不能连接在当前节点后则返回true
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
// 匹配一个数据节点
final boolean tryMatchData() {
// assert isData;
Object x = item;
//将数据节点item修改为null
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}
- Node结点有两种类型:数据结点、请求结点,通过字段isData区分,只有不同类型的结点才能相互匹配;
- Node结点的值保存在item字段,匹配前后值会发生变化;
- 数据节点是针对生产者而言,请求节点是对应消费者线程
结点/状态 | 数据结点 | 请求结点 |
匹配前 | isData = true; item = 数据结点值 | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this(匹配后自连接) |
对于一个数据结点,当item == null
表示匹配成功;对于一个请求结点,当item == this
表示匹配成功。
归纳起来,匹配成功的结点Node就是满足(Node.item == this) || ((Node.item == null) == Node.isDa
ta)
内部属性
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
/**
* True如果是多核CPU
*/
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
/**
* 线程自旋次数(仅多核CPU时用到).
*/
private static final int FRONT_SPINS = 1 << 7;
/**
* 线程自旋次数(仅多核CPU时用到).
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
* 断开被删除节点失败的次数时可容忍的预估计删除失败的最大值。
*/
static final int SWEEP_THRESHOLD = 32;
/**
* 队首结点指针.
*/
transient volatile Node head;
/**
* 队尾结点指针.
*/
private transient volatile Node tail;
/**
* 断开被删除节点失败的次数
*/
private transient volatile int sweepVotes;
// CAS设置队尾tail指针为val
private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
// CAS设置队首head指针为val
private boolean casHead(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
private boolean casSweepVotes(int cmp, int val) {
return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
}
/*
* xfer方法的入参, 不同类型的方法内部调用xfer方法时入参不同.
xfer方法的how参数的可能取值
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long sweepVotesOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = LinkedTransferQueue.class;
headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes"));
} catch (Exception e) {
throw new Error(e);
}
}
}
4个常量值
/*
* xfer方法的入参, 不同类型的方法内部调用xfer方法时入参不同.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfe
这四个常量值,作为xfer方法的入参,用于标识不同操作类型:
NOW=0表示即时操作(可能失败),即不会阻塞调用线程:
poll(获取并移除队首元素,如果队列为空,直接返回null);
tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者,则立即返回false,也不会将元素入队)
ASYNC=1表示异步操作(必然成功):
xfer被操作线程调用时,无论xfer操作过程时候完成,调用者都不会进行阻塞等待。
offer,put,add(插入指定元素至队尾,由于是无界队列,所以会立即返回true);
SYNC=2表示同步操作(阻塞调用线程):
只有xfer操作过程达到了调用线程所期望的结果(或抛出异常),调用者才会继续向下执行,否则就一直处于阻塞状态下。
transfer(阻塞直到出现一个消费者线程);take(从队首移除一个元素,如果队列为空,则阻塞线程)
TIMED=3表示限时同步操作(限时阻塞调用线程):
poll(long timeout, TimeUnit unit);
tryTransfer(E e, long timeout, TimeUnit unit)
transfer
transfer方法,用于将指定元素e传递给消费者线程(调用take/poll方法)。如果有消费者线程正在阻塞等待,则调用transfer方法的线程会
直接将元素传递给它;如果没有消费者线程等待获取元素,则调用transfer方法的线程会将元素插入到队尾,然后阻塞等待,直到出现一
个消费者线程获取元素:
/**
* 将指定元素e传递给消费者线程(调用take/poll方法).
*/
public void transfer(E e) throws InterruptedException {
//对应生产者而言item 数据被取走肯定会被修改为null,当xfer 返回不为null时说明出现异常被中断了
if (xfer(e, true, SYNC, 0) != null) {
// 进入到此处, 说明调用线程被中断了
Thread.interrupted(); // 清除中断状态, 然后抛出中断异常
throw new InterruptedException();
}
}
transfer方法的内部实际是调用了xfer方法,入参为SYNC=2:
/**
* 入队/出队元素的真正实现.
*
* @param e 入队操作, e非null; 出队操作, e为null
* @param haveData true表示入队元素, false表示出队元素
* @param how NOW, ASYNC, SYNC, TIMED 四种常量定义
* @param nanos 限时模式下使用(纳秒)
* @return 匹配成功则返回匹配的元素, 否则返回e本身
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//从head开始找一个未匹配的互补节点尝试进行匹配交换数据
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData; // p节点的模式
Object item = p.item;
// 同一时刻队列中只会存储一种类型的节点
// 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
// 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
// p没有被匹配到,并且节点模式合符规定,注意对于消费匹配完成item会等于this
if (item != p && (item != null) == isData) {
// 当前节点与待处理数据模式相同,不能匹配,重新开始
if (isData == haveData)
break;
// 到这里说明当前节点与待处理数据模式不同,进行匹配即交换数据。
//修改e的值以完成数据匹配:入队操作, e非null; 出队操作, e为null
//如果是消费者来匹配则p节点肯定是生产者线程节点,则修改为null(消费者线程携带的e=null),反之如果是生产者来匹 //配,p节点为消费者节点,修改为item=e
if (p.casItem(item, e)) {
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
// 如果head还没变,且如果当前节点已经是最后一个节点了则head指向该节点,否则指向该节点的下一个节点
// 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
// 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
// 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext(); //让原head的next指向自身,形成自链接,也就是从单链表中删除了
break;
}
// 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // if条件成立说明head距离第一个未匹配节点没有超过1,所以不需要更新head
}
//匹配完成之后唤醒被阻塞在当前节点的线程,返回节点数据
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// p已经被匹配了或者尝试匹配的时候失败了
// 也就是其它线程先一步匹配了p
// 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
// 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//到这里说明没有找到可以匹配的节点
//how不为NOW(put,offer,add,take、超时poll,transfer,超时tryTransfer),说明需要入队
// NOW,立即返回,没有匹配到立即返回,不做入队操作
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData); //创建节点实例
Node pred = tryAppend(s, haveData); //尝试加入队尾,返回其前驱节点
if (pred == null) //前驱为null,说明有与其互补的未匹配节点入队
continue retry; // 这个时候需要重新尝试匹配
if (how != ASYNC) //how 不是异步即是同步或者超时等待(take,超时poll,transfer,超时tryTransfer),说明需要阻塞等待
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; //不需要等待直接返回数据e(put,offer,超时offer,add入队之后返回;poll,tryTransfer不入队返回)
}
找到 head 节点,如果 head 节点是匹配的操作,就直接赋值,如果不是,添加到队列中
tryAppend
/**
* 尝试将节点s追加为尾部。
* 返回值:
* 1. 队列为空,刚刚入队的s是队列中唯一的节点,返回s本身
* 2. 队列不为空,成功将s链接到最后一个节点p之后,返回s的前驱p
* 3. 队列不为空,但是队列中存在与其互补的未匹配节点,返回null
*/
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
//队列为空,则直接将s设置成head,返回s本身
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
//当前节点p是一个模式互补且未被匹配的节点则不能链接到该节点之后
//因为它完全可以和节点s完成匹配使它们都返回
else if (p.cannotPrecede(haveData))
return null;
//当前节点p不是实际的最后一个节点,继续循环寻找最后一个节点
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) : // tail被更新了则取新的tail
(p != n) ? n : null; // 取p的下一个节点或者若p已经失效重新从head开始
//p是最后一个节点,将s链接到它的下一个节点
//如果被其它线程抢先入队则p指向其next继续循环
else if (!p.casNext(null, s))
// 如果CAS更新s为p的next失败
// 则说明有其它线程先一步更新到p的next了
// 就让p指向p的next,重新尝试让s入队
p = p.next;
else {
//到这里说明成功将s链接到p的next,
// 如果p不等于t,就更新tail指针
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) && //tail还没被更新则更新指向新的尾节点s
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p; //返回s的前驱
}
}
}
take()
方法会从队首取出一个元素,如果队列为空,则线程会阻塞:
/** * 从队首出队一个元素. */
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0); // (e == null && isData=false)表示一个请求结点
if (e != null) // 如果e!=null, 则表示匹配成功, 此时e为与之匹配的数据结点的值
return e;
Thread.interrupted(); throw new InterruptedException();
}
内部依然调用了xfer方法,不过此时入参有所不同,由于是消费线程调用,
所以入参 e == null && hasData == false,表示一个“请求结点”
awaitMatch
/**
* 自旋/yield/阻塞,直到结点s被匹配.
*
* @param s 等待被匹配的结点s
* @param pred s的前驱结点或s自身(队列中只有一个结点的情况)
* @param e 结点s的值
* @return 匹配值, 或e本身(中断或超时情况)
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // 限时等待情况下使用
Thread w = Thread.currentThread();
int spins = -1; // 自旋次数, 锁优化操作
ThreadLocalRandom randomYields = null; // bound if needed
for (; ; ) {
Object item = s.item;
if (item != e) { // 数据已经发生变化说明已经被匹配了
// assert item != s;
// 把s的item更新为s本身
// 并把s中的waiter置为空
s.forgetContents(); //避免垃圾引用持有
return LinkedTransferQueue.<E>cast(item);
}
//如果发生了中断或超时,则取消节点,即将数据item指向自身,返回e
if ((w.isInterrupted() || (timed && nanos <= 0))
&& s.casItem(e, s)) {
unsplice(pred, s);//若发生了超时或中断,在返回之前还需要断开该节点的链接也就是删除s节点
return e;
}
//到这里说明节点没有发生异常,要进行阻塞
// 如果自旋次数小于0,就计算自旋次数
if (spins < 0) {
// spinsFor()计算自旋次数
// 如果前面有节点未被匹配就返回0
// 如果前面有节点且正在匹配中就返回一定的次数,等待
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
} else if (spins > 0) { // 自选次数减1
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // 随机yield线程
} else if (s.waiter == null) { // waiter保存待阻塞线程
s.waiter = w;
} else if (timed) { // 限时等待情况, 计算剩余有效时间
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
} else { // 阻塞线程
LockSupport.park(this);
}
}
}
//返回具有给定前驱节点和数据模式的节点s的自旋次数
private static int spinsFor(Node pred, boolean haveData) {
if (MP && pred != null) { //是多处理器才需要自旋
if (pred.isData != haveData)
return FRONT_SPINS + CHAINED_SPINS;
// 前驱已经被匹配了,即当前节点是第一个自旋节点,自旋次数为FRONT_SPINS
if (pred.isMatched())
return FRONT_SPINS;
// 前驱也处于自旋状态,则自旋次数为其一半
if (pred.waiter == null)
return CHAINED_SPINS;
}
return 0; //单核CPU不需要自旋,自旋次数为0
}
final void unsplice(Node pred, Node s) {
s.forgetContents(); // 防止垃圾引用持有
/*
* 1. 如果前驱依然指向s,尝试断开与s的链接。
* 2. 如果操作失败(由于s是尾节点或者前驱已经断开了),并且前驱和s都不是head也没有出队,则积累失败次数
* 3. 当失败次数累计到临界值就进行清理
*/
if (pred != null && pred != s && pred.next == s) {//前驱的依然next指向s
Node n = s.next;
//s是尾节点或者前驱已经被匹配
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
// 看是否是head或将要成为新的head,根据需要更新head指向第一个未匹配节点
for (;;) {
Node h = head;
if (h == pred || h == s || h == null)
return; // 是头节点或者队列为空,直接返回
if (!h.isMatched()) //head未被匹配,则不需对head进行处理
break;
Node hn = h.next;
if (hn == null)
return; // 队列为空,返回
if (hn != h && casHead(h, hn)) //使head指向第一个未匹配节点
h.forgetNext();
}
//重新检查节点是否已经出队,若没有对失败次数进行累计,
//当失败次数达到临界值SWEEP_THRESHOLD,执行sweep进行清理
//sweep就是从head开始遍历清除队列中那些已经匹配过的节点
if (pred.next != pred && s.next != s) {
for (;;) {
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {
sweep();
break;
}
}
}
}
}
}
LinkedTransferQueue其实兼具了SynchronousQueue的特性以及无锁算法的性能,并且是一种无界队列:
- 和SynchronousQueue相比,LinkedTransferQueue可以存储实际的数据;
- 和其它阻塞队列相比,LinkedTransferQueue直接用无锁算法实现,性能有所提升。
另外,由于LinkedTransferQueue可以存放两种不同类型的结点,所以称之为“Dual Queue”:
内部Node结点定义了一个 boolean 型字段——isData,表示该结点是“数据结点”还是“请求结点”
在结点被匹配(被删除)之后,不会立即更新队列的head、tail,而是当 head、tail结点与最近一个未匹配的结点之间的距离超过“松弛阀
值”后才会更新(默认为 2)。这个“松弛阀值”一般为1到3,如果太大会增加沿链表查找未匹配结点的时间,太小会增加 CAS 的开销。
boolean remove(Object o)
//从此队列中删除指定元素的单个实例(如果存在)。
//内部节点移除方法,就是找到队列中第一个数据item与其相等(item.equals(o))的节点并移除,通过源码可见移除是通过伪匹配实现的,即伪造成被请求线程匹配,然后唤醒对应的阻塞线程,尝试断开该节点与其前驱的链接。
public boolean remove(Object o) {
return findAndRemove(o);
}
private boolean findAndRemove(Object e) {
if (e != null) {
for (Node pred = null, p = head; p != null; ) {
Object item = p.item;
//如果是数据节点
if (p.isData) {
//与数据节点进行数据比较,找到之后进行匹配,
//并唤醒对应的阻塞线程,然后尝试断开节点与其前驱的链接,返回true
if (item != null && item != p && e.equals(item) &&
p.tryMatchData()) {
unsplice(pred, p);
return true;
}
}
//第一个节点若不是数据节点,则表示队列中都是请求节点。直接返回false
else if (item == null)
break;
//到这里说明队列中是数据节点,但是当前遍历节点数据不是目标数据
//更新遍历指针到下一个节点,或者从head重新开始遍历(当前节点已经失效)
pred = p;
if ((p = p.next) == pred) { // stale
pred = null;
p = head;
}
}
}
return false;
}
四、jdk17
jdk 17 与jdk8 LinkedTransferQueue思想基本一致;主要在部分方法实现存在区别
jdk17 node节点
static final class Node implements ForkJoinPool.ManagedBlocker {
final boolean isData; // 如果没有存储数据,那么说明这个节点记录的是一个消费者请求否则就是记录的消费者请求
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null when not waiting for a match
// 如果data持有的item 不为null 则创建数据节点,否则创建请求节点
Node(Object item) {
ITEM.set(this, item);
isData = (item != null);
}
//创建一个匹配数据的虚拟节点,主要用于队列为空,生成一个虚拟占位节点(节点元素为null)入队,然后消费者线程被等待在这个节点上
//等待生产者线程放入数据到item
Node() {
isData = true;
}
final boolean casNext(Node cmp, Node val) {
// assert val != null;
return NEXT.compareAndSet(this, cmp, val);
}
final boolean casItem(Object cmp, Object val) {
// assert isData == (cmp != null);
// assert isData == (val == null);
// assert !(cmp instanceof Node);
return ITEM.compareAndSet(this, cmp, val);
}
/**
* 将节点链接到自己,以避免垃圾引用持有,cas 修改头部节点属性以后访问
*/
final void selfLink() {
// assert isMatched();
NEXT.setRelease(this, this);
}
final void appendRelaxed(Node next) {
// assert next != null;
// assert this.next == null;
NEXT.setOpaque(this, next);
}
/**
* 节点是否匹配过了
*/
final boolean isMatched() {
return isData == (item == null);
}
/** Tries to CAS-match this node; if successful, wakes waiter. */
final boolean tryMatch(Object cmp, Object val) {
if (casItem(cmp, val)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
// 如果给定节点不能连接在当前节点后则返回true,因为此节点是无法匹配的的并且节点的数据模式相反
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
return d != haveData && d != (item == null);
}
public final boolean isReleasable() {
return (isData == (item == null)) ||
Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
private static final long serialVersionUID = -3375979862319811754L;
}
主要参数
/**
* 使用带超时的park自旋的纳秒数
*/
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
/**
断开被删除节点失败的次数时可容忍的预估计删除失败的最大值。此参数感觉没啥用,全程没使用到,估计是修改以前jdk没有删除
*/
static final int SWEEP_THRESHOLD = 32;
put/offer
public void put(E e) {
xfer(e, true, ASYNC, 0L);
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0L);
return true;
}
//haveData 表示xfer方法的调用是否有数据对象通过上一个e参数进行传入,也就是说e和haveData这两个参数是配对使用的。当e为null时,haveData应该为false;反之当e不为nul时,haveData应该为true;
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
restart: for (Node s = null, t = null, h = null;;) {
//判断当前操作是入队操作还是出队操作
//当前xfer操作的性质(haveData)和当前链表tail引用位置所描述的操作性质(t.isData)一致则从队尾入队
//反之不一致则从head位置开始判定和进行的出队操作:
for (Node p = (t != (t = tail) && t.isData == haveData) ? t
: (h = head);; ) {
final Node q; final Object item;
// 出队操作p.isData != haveData 说明队列里节点模式和当前入参节点模式不同
//haveData == ((item = p.item) == null) 判断数据是否已经匹配
//如果当前线程是生产者线程那么haveData=true,匹配的p节点就是消费者节点,没有匹配时p.item = null
//如果是消费者线程那么haveDate=false,p节点就是生产者线程,没有匹配时p.item 必然不为null
if (p.isData != haveData
&& haveData == ((item = p.item) == null)) {
//将局部变量h引用与当前单向链表的head位置,避免在多线程情况下head引用被改变引起的处理错误
if (h == null) h = head;
// 尝试匹配数据, 如果是生产者任务从队列中取出,那么赋值成功后,当前节点p的item属性将为e(不会为null)
// 如果是消费者任务从队列中取出,那么赋值成功后,当前节点p的item属性将为null
if (p.tryMatch(item, e)) {
//如果条件成立,就要进行以h代表的节点为基准的链表清理操作
//经过tryMatch cas的item已经修改,此时isData属性的值和item属性拥有值的真实情况是相悖的节点
//需要清理出队列
//h 是头结点,p是需要清理的结束节点
if (h != p) skipDeadNodesNearHead(h, p);
return (E) item;
}
}
// 入队操作的场景
// 加入队列的可能是消费者任务,也可能是生产者任务
// 根据之前对单向链表tail引用位置的描述,tail引用的位置不一定是单向链表的最后一个节点
// 所以首先将p节点移动到链表的最后一个节点,否则就不进行业务逻辑处理
if ((q = p.next) == null) {
// 操作方式为NOW的即时入队操作,将会被忽略
if (how == NOW) return e;
if (s == null) s = new Node(e);//入队操作需要生成一个新的Node节点
if (!p.casNext(null, s)) continue;//将当前操作s结点引用到当前p结点的item属性,入队尾
// 引起这个的原因可能有很多:
// a、当前xfer操作在中为p节点关联next属性的操作:p.casNext(null, s)不停失败,
// 不停的在第二层for循环中做q = p.next 和 p == (p = q) 操作
// b、虽然xfer操作成功了,但是当前线程连续进行了两次xfer调用操作
if (p != t) casTail(t, s);
// put模式都是 ASYNC,所以put线程不用阻塞直接返回继续执行。相当于异步交付任务
if (how == ASYNC) return e;
// 是take操作,操作码为 SYNC,跳入此方法执行,awaitMatch 等待匹配
// 先自旋一段时间,然后调用LockSupport.park
return awaitMatch(s, p, e, (how == TIMED), nanos);
}
//让p引用指向当前节点的下一个节点
// 如果当前节点的next属性指向自己,说明当前节点已经被其他线程修改了自连接已经变成了无效状态(p节点已经出队列)
if (p == (p = q)) continue restart;
}
}
}
// h变量表示清理的开始(节点)位置
// p变量表示清理的结束(节点)位置,p所引用的Node节点一定是一个无效节
private void skipDeadNodesNearHead(Node h, Node p) {
// assert h != null;
// assert h != p;
// assert p.isMatched();
//找到单向链表中离链表头部最近的有效节点
for (;;) {
final Node q;
// 如果清理过程发现已经达到当前链表的最后一个节点,则退出
if ((q = p.next) == null) break;
// 如果q变量指向的Node节点是有效的,就说明已找到了单向链表中离链表头部最近的有效节点了,不需要再继续向后找
else if (!q.isMatched()) {
p = q; break;
}
// 如果以上条件不成立,则还是要将q变量的值赋给p,而且通过循环,继续向链表的后续结点寻找。
//如果p节点出现了自循环的情况,这种情况代表p已经被其它线程的调用过程清理出了队列,那么直接退出处理即可
else if (p == (p = q)) return;
}
//重新设置单向链表的head属性的对象引用位置,并将原来h变量引用的Node节点设置为自循环,会被gc回收
if (casHead(h, p))
h.selfLink();
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
//似曾相识,借用SynchronousQueue 实现
final boolean isData = s.isData;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
final Thread w = Thread.currentThread();
int stat = -1; // -1: may yield, +1: park, else 0
Object item;
//对于生产者当前节点的item 没有被取走 条件一直成立,取走了item=null会退出
//对于消费者线程匹配完item !=null ,e=null直接退出
while ((item = s.item) == e) {
if (needSweep) // help clean
sweep();
//设置超时且超时时间到了或者当前线程被中断则 取消:从队列移除s节点
else if ((timed && nanos <= 0L) || w.isInterrupted()) {
if (s.casItem(e, (e == null) ? s : null)) {
unsplice(pred, s); // cancelled
return e;
}
}
else if (stat <= 0) {
//pred.next == s 说明没有匹配完成
if (pred != null && pred.next == s) {
//节点模式不同或者pred已经匹配过了
if (stat < 0 &&
(pred.isData != isData || pred.isMatched())) {
stat = 0; // yield once if first
Thread.yield();
}
else {
//这里说明还没有匹配,需要设置阻塞线程为w
stat = 1;
s.waiter = w; // enable unpark
}
} // else signal in progress
}
// 如果是生产者或者消费者没有完成匹配,item 和e 是相等的,不同说明已经完成匹配,直接退出
else if ((item = s.item) != e)
break; // recheck
else if (!timed) {//不带超时的需要阻塞
LockSupport.setCurrentBlocker(this);
try {
ForkJoinPool.managedBlock(s);
} catch (InterruptedException cannotHappen) { }
LockSupport.setCurrentBlocker(null);
}
else {
nanos = deadline - System.nanoTime();
//待超时的超过自旋时间阈值也需要阻塞的
if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanos);
}
}
//阻塞醒来的清理操作
if (stat == 1)
WAITER.set(s, null);
if (!isData)
ITEM.set(s, s); // self-link to avoid garbage
return (E) item;
}
final void unsplice(Node pred, Node s) {
// assert pred != null;
// assert pred != s;
// assert s != null;
// assert s.isMatched();
// assert (SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1)) == 0;
s.waiter = null; // disable signals
/*
* 1. 如果前驱依然指向s,尝试断开与s的链接。
* 2. 如果操作失败(由于s是尾节点或者前驱已经断开了),并且前驱和s都不是head也没有出队,设置清理标志位为true
*/
if (pred != null && pred.next == s) {//前驱的依然next指向s
Node n = s.next;
//s是尾节点或者前驱已经被匹配
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
for (;;) { // 看是否是head或将要成为新的head,根据需要更新head指向第一个未匹配节点
Node h = head;
if (h == pred || h == s)
return; // 是头节点或者队列为空,直接返回
if (!h.isMatched()) //head未被匹配,则不需对head进行处理
break;
Node hn = h.next;
if (hn == null)
return; // 队列为空,返回
if (hn != h && casHead(h, hn))//使head指向第一个未匹配节点
h.selfLink(); //清理旧的head
}
// 需要清理设置清理标志位true
if (pred.next != pred && s.next != s)
needSweep = true;
}
}
}
take()
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0L);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
remove()
public boolean remove(Object o) {
if (o == null) return false;
restartFromHead:
for (;;) {
for (Node p = head, pred = null; p != null; ) {
Node q = p.next;
final Object item;
//(item = p.item) != null和p.isData说明是生产者线程没匹配之前
if ((item = p.item) != null) {
if (p.isData) {//ture说明是生产者线程匹配之前的删除
// 移除元素的时候,把等待队列中的元素删除的同时要保证线程不被继续阻塞
// (因为有可能此时等待队列中的为take线程),
// 所以仍然需要调用 tryMatch方法中的LockSupport.unpark
if (o.equals(item) && p.tryMatch(item, null)) {
skipDeadNodes(pred, p, p, q);//处理pred 到 q之间无效待清理的节点
return true;
}
pred = p; p = q; continue;
}
}
//p消费者线程没有匹配。退出,给予重新匹配机会
else if (!p.isData)
break;
//p消费者线程已经匹配,处理已经失效待清理的节点
for (Node c = p;; q = p.next) {
if (q == null || !q.isMatched()) {
pred = skipDeadNodes(pred, c, p, q); p = q; break;
}
//如果其他线程已经修改p为自连接,已经清理了重试
if (p == (p = q)) continue restartFromHead;
}
}
return false;
}
}
private Node skipDeadNodes(Node pred, Node c, Node p, Node q) {
// assert pred != c;
// assert p != q;
// assert c.isMatched();
// assert p.isMatched();
if (q == null) {
// Never unlink trailing node.
if (c == p) return pred;
q = p;
}
return (tryCasSuccessor(pred, c, q)
&& (pred == null || !pred.isMatched()))? pred : p;
}
private boolean tryCasSuccessor(Node pred, Node c, Node p) {
// assert p != null;
// assert c.isData != (c.item != null);
// assert c != p;
if (pred != null)
return pred.casNext(c, p);
if (casHead(c, p)) {
c.selfLink();
return true;
}
return false;
}
五、与synchronousqueue 区别
LinkedTransferQueue 如果有消费者线程存在则生产者线程将数据传递到占位节点并唤醒消费者线程,没有消费
者线程等待则实现阻塞直到消费者取元素,而其他方法不阻塞,synchronousqueue 只有阻塞直到消费者获取元
素。这个过程中队列不存数据,直接等到消费者来获取时交给了消费者。