这里续写上一章博客
Phaser新特性 :
特性1:动态调整线程个数
CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行期间动态地 调整要同步的线程个数,Phaser 提供了下面这些方法来增加、减少所要同步的线程个数
register ( )
bulkRegister ( int parties)
arriveAndDeregister ( )
public int register ( ) {
return doRegister ( 1 ) ;
}
public int bulkRegister ( int parties) {
if ( parties < 0 )
throw new IllegalArgumentException ( ) ;
if ( parties == 0 )
return getPhase ( ) ;
return doRegister ( parties) ;
}
public int arriveAndDeregister ( ) {
return doArrive ( ONE_DEREGISTER ) ;
}
private static final int ONE_ARRIVAL = 1 ;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT ;
private static final int ONE_DEREGISTER = ONE_ARRIVAL | ONE_PARTY ;
private static final int EMPTY = 1 ;
private static final int PARTIES_SHIFT = 16 ;
特性2:层次Phaser
多个Phaser可以组成如下图所示的树状结构,可以通过在构造方法中传入父Phaser来实现
public Phaser ( Phaser parent, int parties) {
}
先简单看一下Phaser内部关于树状结构的存储,如下所示:
private final Phaser parent;
可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表,所 以,每个 Phaser 知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节 点来实现的
树状的Phaser怎么使用呢?考虑如下代码,会组成下图的树状Phaser:
Phaser root = new Phaser ( 2 ) ;
Phaser c1 = new Phaser ( root, 3 ) ;
Phaser c2 = new Phaser ( root, 2 ) ;
Phaser c3 = new Phaser ( c1, 0 ) ;
public Phaser ( ) {
this ( null , 0 ) ;
}
public Phaser ( int parties) {
this ( null , parties) ;
}
public Phaser ( Phaser parent) {
this ( parent, 0 ) ;
}
public Phaser ( Phaser parent, int parties) {
if ( parties >>> PARTIES_SHIFT != 0 )
throw new IllegalArgumentException ( "Illegal number of parties" ) ;
int phase = 0 ;
this . parent = parent;
if ( parent != null ) {
final Phaser root = parent. root;
this . root = root;
this . evenQ = root. evenQ;
this . oddQ = root. oddQ;
if ( parties != 0 )
phase = parent. doRegister ( 1 ) ;
}
else {
this . root = this ;
this . evenQ = new AtomicReference < QNode > ( ) ;
this . oddQ = new AtomicReference < QNode > ( ) ;
}
this . state = ( parties == 0 ) ? ( long ) EMPTY :
( ( long ) phase << PHASE_SHIFT ) |
( ( long ) parties << PARTIES_SHIFT ) |
( ( long ) parties) ;
}
本来root有两个参与者(参与者:对应的参数值,即线程上限),然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与者,root的 参与者就变成2+2=4个,c1本来有3个参与者,为其加入了一个子Phaser c3,参与者数量变成3+1=4个,c3的参与 者初始为0,后续可以通过调用register()方法加入
对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是 一样的,父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册,当 子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册
简单来说就是:父Phaser把子Phaser当作一个正常参与的线程 就即可
state变量解析:
大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理,Phaser没有基于AQS来实现,但具备AQS的核⼼特性:state变量、CAS操作、阻塞队列(所以AQS是利用CAS的,一般来说,大多数的操作都是使用AQS,但是我们说成CAS也行,因为CAS是主要核心),我们先从state变量说起
private volatile long state;
这个64位(long的)的state变量被拆成4部分,下图为state变量各部分:
最高位(一般代表最后面的一位,比如100,那么1就是最高位)若是0,则表示未同步完成,若是1,则表示同步完成,初始最高位为0
Phaser提供了一系列的成员方法来从state中获取上图中的⼏个数字,如下所示:
public final int getPhase ( ) {
return ( int ) ( root. state >>> PHASE_SHIFT ) ;
}
private final Phaser root;
private volatile long state;
private static final int PHASE_SHIFT = 32 ;
private static final int PARTIES_SHIFT = 16 ;
public boolean isTerminated ( ) {
return root. state < 0L ;
}
public int getRegisteredParties ( ) {
return partiesOf ( state) ;
}
private static final int PARTIES_SHIFT = 16 ;
private static int partiesOf ( long s) {
return ( int ) s >>> PARTIES_SHIFT ;
}
public int getUnarrivedParties ( ) {
return unarrivedOf ( reconcileState ( ) ) ;
}
private static final int EMPTY = 1 ;
private static final int UNARRIVED_MASK = 0xffff ;
private static int unarrivedOf ( long s) {
int counts = ( int ) s;
return ( counts == EMPTY ) ? 0 : ( counts & UNARRIVED_MASK ) ;
}
private long reconcileState ( ) {
final Phaser root = this . root;
long s = state;
if ( root != this ) {
int phase, p;
while ( ( phase = ( int ) ( root. state >>> PHASE_SHIFT ) ) !=
( int ) ( s >>> PHASE_SHIFT ) &&
! STATE . weakCompareAndSet
( this , s,
s = ( ( ( long ) phase << PHASE_SHIFT ) |
( ( phase < 0 ) ? ( s & COUNTS_MASK ) :
( ( ( p = ( int ) s >>> PARTIES_SHIFT ) == 0 ) ? EMPTY :
( ( s & PARTIES_MASK ) | p) ) ) ) ) )
s = state;
}
return s;
}
我们再次的看一下state变量在构造方法中是如何被赋值的:
public Phaser ( Phaser parent, int parties) {
if ( parties >>> PARTIES_SHIFT != 0 )
throw new IllegalArgumentException ( "Illegal number of parties" ) ;
int phase = 0 ;
this . parent = parent;
if ( parent != null ) {
final Phaser root = parent. root;
this . root = root;
this . evenQ = root. evenQ;
this . oddQ = root. oddQ;
if ( parties != 0 )
phase = parent. doRegister ( 1 ) ;
}
else {
this . root = this ;
this . evenQ = new AtomicReference < QNode > ( ) ;
this . oddQ = new AtomicReference < QNode > ( ) ;
}
this . state = ( parties == 0 ) ? ( long ) EMPTY :
( ( long ) phase << PHASE_SHIFT ) |
( ( long ) parties << PARTIES_SHIFT ) |
( ( long ) parties) ;
}
private static final int PARTIES_SHIFT = 16 ;
private static final int PHASE_SHIFT = 32 ;
private static final int EMPTY = 1 ;
当parties=0时,state被赋予一个EMPTY常量,常量为1
当parties != 0时(并且没有超过2的16次方),那么把phase值左移32位,把parties左移16位,然后parties也作为最低的16位,3个值做或操 作,赋值给state(前面以及说明了流程了)
阻塞与唤醒(Treiber Stack):
基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒,如下图所示,右边的主线程会调用awaitAdvance()进行阻塞,左边的arrive()会对state进行CAS的累减操作(也的确与之前的CountDownLatch类似),当未到达的线程数减到0时,唤醒右边阻 塞的主线程
在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表,Treiber Stack是一个无锁 的栈,它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针,如下的实 现:
private final AtomicReference < QNode > evenQ;
private final AtomicReference < QNode > oddQ;
static final class QNode implements ForkJoinPool. ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread;
QNode next;
}
为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack,当phase为奇数轮的时候,阻塞线程放在oddQ里面,当phase为偶数轮的时候,阻塞线程放在evenQ里面,代码如下所示(后面会再次的说明或者给出他的)
private void releaseWaiters ( int phase) {
QNode q;
Thread t;
AtomicReference < QNode > head = ( phase & 1 ) == 0 ? evenQ : oddQ;
while ( ( q = head. get ( ) ) != null &&
q. phase != ( int ) ( root. state >>> PHASE_SHIFT ) ) {
if ( head. compareAndSet ( q, q. next) &&
( t = q. thread) != null ) {
q. thread = null ;
LockSupport . unpark ( t) ;
}
}
}
arrive()方法分析:
下面看arrive()方法是如何对state变量进行操作,又是如何唤醒线程的:
public int arrive ( ) {
return doArrive ( ONE_ARRIVAL ) ;
}
public int arriveAndDeregister ( ) {
return doArrive ( ONE_DEREGISTER ) ;
}
arrive()和 arriveAndDeregister()内部调用的都是 doArrive方法
区别在于前者只是把"未达到线程数"减1(对应的值默认就是1,且基本不能改变,至于其final是否可以修改,可以看这个博客:https://blog.csdn.net/afdafvdaa/article/details/115190755),后者则把"未到达线程数"和"下一轮的总线程数"都减1,我们主要看第一种,下面看一下对应的doArrive方法的实现:
private int doArrive ( int adjust) {
final Phaser root = this . root;
for ( ; ; ) {
long s = ( root == this ) ? state : reconcileState ( ) ;
int phase = ( int ) ( s >>> PHASE_SHIFT ) ;
if ( phase < 0 )
return phase;
int counts = ( int ) s;
int unarrived = ( counts == EMPTY ) ? 0 : ( counts & UNARRIVED_MASK ) ;
if ( unarrived <= 0 )
throw new IllegalStateException ( badArrive ( s) ) ;
if ( STATE . compareAndSet ( this , s, s-= adjust) ) {
if ( unarrived == 1 ) {
long n = s & PARTIES_MASK ;
int nextUnarrived = ( int ) n >>> PARTIES_SHIFT ;
if ( root == this ) {
if ( onAdvance ( phase, nextUnarrived) )
n |= TERMINATION_BIT ;
else if ( nextUnarrived == 0 )
n |= EMPTY ;
else
n |= nextUnarrived;
int nextPhase = ( phase + 1 ) & MAX_PHASE ;
n |= ( long ) nextPhase << PHASE_SHIFT ;
STATE . compareAndSet ( this , s, n) ;
releaseWaiters ( phase) ;
}
else if ( nextUnarrived == 0 ) {
phase = parent. doArrive ( ONE_DEREGISTER ) ;
STATE . compareAndSet ( this , s, s | EMPTY ) ;
}
else
phase = parent. doArrive ( ONE_ARRIVAL ) ;
}
return phase;
}
}
}
关于上面的方法,有以下⼏点说明:
1:定义了2个常量如下
当 deregister=false 时(代表不解除注册),只是最低的16位需要减 1(s-=adjust),因为adj=ONE_ARRIVAL,当deregister=true时(代表解除注册),32位中 的2个低16位都需要减1(即前面的16位减1,后面的16位减1),因为adj=ONE_ARRIVAL|ONE_PARTY,那么是什么意思呢,在前面的s-=adjust中可以这样的理解,因为实际上如果是0000 0000 0000 0001,那么这个1,就是低16位的1,而adj=ONE_ARRIVAL|ONE_PARTY代表两个这个,因为他先操作了左移动16,然后操作ONE_ARRIVAL|ONE_PARTY,所以是低32位的两个低16位都会减1(这自然是对二进制来说的,比如1110,减去2,即减去0010,那么结果就是1100,即其中一个1减了,即减1了,只是上面的1是最低的)
private static final int ONE_ARRIVAL = 1 ;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT ;
private static final int PARTIES_SHIFT = 16 ;
2:若以结果看,若把未到达线程数减1(因为有些操作会使得state改变,那么最终就会使得未到达线程数发生改变),减了之后,如果还未到0,说明其他的情况基本什么都不做(因为至少到1才会减到0),那么其他的情况基本直接返回(那么他对应的操作结束,执行后面的代码,就如前面说过的,latch.countDown();后面还可以操作latch.countDown();,就是可以执行,因为他只是操作减而已,那么只要他进行了减,说明操作完毕,并且由于先后顺序,自然使得先操作的基本不会抢到),如果到0,最终会做2件事情:第1, 重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加1,第2,唤醒队列中的 线程
首先我们看一下唤醒方法:
private void releaseWaiters ( int phase) {
QNode q;
Thread t;
AtomicReference < QNode > head = ( phase & 1 ) == 0 ? evenQ : oddQ;
while ( ( q = head. get ( ) ) != null &&
q. phase != ( int ) ( root. state >>> PHASE_SHIFT ) ) {
if ( head. compareAndSet ( q, q. next) &&
( t = q. thread) != null ) {
q. thread = null ;
LockSupport . unpark ( t) ;
}
}
}
遍历整个栈,只要栈当中节点的phase不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮 的,应该被释放并唤醒
awaitAdvance()方法分析:
public int awaitAdvance ( int phase) {
final Phaser root = this . root;
long s = ( root == this ) ? state : reconcileState ( ) ;
int p = ( int ) ( s >>> PHASE_SHIFT ) ;
if ( phase < 0 )
return phase;
if ( p == phase)
return root. internalAwaitAdvance ( phase, null ) ;
return p;
}
下面的while循环中有4个分⽀:
初始的时候,node==null,进入第1个分⽀进行自旋,自旋次数满⾜之后,会新建一个QNode节点
之后执行第3、第4个分⽀,分别把该节点入栈并阻塞
private int internalAwaitAdvance ( int phase, QNode node) {
releaseWaiters ( phase- 1 ) ;
boolean queued = false ;
int lastUnarrived = 0 ;
int spins = SPINS_PER_ARRIVAL ;
long s;
int p;
while ( ( p = ( int ) ( ( s = state) >>> PHASE_SHIFT ) ) == phase) {
if ( node == null ) {
int unarrived = ( int ) s & UNARRIVED_MASK ;
if ( unarrived != lastUnarrived &&
( lastUnarrived = unarrived) < NCPU )
spins += SPINS_PER_ARRIVAL ;
boolean interrupted = Thread . interrupted ( ) ;
if ( interrupted || -- spins < 0 ) {
node = new QNode ( this , phase, false , false , 0L ) ;
node. wasInterrupted = interrupted;
}
else
Thread . onSpinWait ( ) ;
}
else if ( node. isReleasable ( ) )
break ;
else if ( ! queued) {
AtomicReference < QNode > head = ( phase & 1 ) == 0 ? evenQ : oddQ;
QNode q = node. next = head. get ( ) ;
if ( ( q == null || q. phase == phase) &&
( int ) ( state >>> PHASE_SHIFT ) == phase)
queued = head. compareAndSet ( q, node) ;
}
else {
try {
ForkJoinPool . managedBlock ( node) ;
} catch ( InterruptedException cantHappen) {
node. wasInterrupted = true ;
}
}
}
if ( node != null ) {
if ( node. thread != null )
node. thread = null ;
if ( node. wasInterrupted && ! node. interruptible)
Thread . currentThread ( ) . interrupt ( ) ;
if ( p == phase && ( p = ( int ) ( state >>> PHASE_SHIFT ) ) == phase)
return abortWait ( phase) ;
}
releaseWaiters ( phase) ;
return p;
}
这里调用了ForkJoinPool.managedBlock(ManagedBlocker blocker)方法,⽬的是把node对应的线程阻塞,ManagerdBlocker是ForkJoinPool里面的一个静态接口,定义如下:
public class ForkJoinPool extends AbstractExecutorService {
public static interface ManagedBlocker {
boolean block ( ) throws InterruptedException ;
boolean isReleasable ( ) ;
}
}
static final class QNode implements ForkJoinPool. ManagedBlocker {
QNode实现了该接口,实现原理还是park(),如下所示,之所以没有直接使用park()/unpark()来实现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑,一方面是park()可能被中断唤醒,另一 方面是带超时时间的park(),把这二者都封装在一起
static final class QNode implements ForkJoinPool. ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread;
QNode next;
QNode ( Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this . phaser = phaser;
this . phase = phase;
this . interruptible = interruptible;
this . nanos = nanos;
this . timed = timed;
this . deadline = timed ? System . nanoTime ( ) + nanos : 0L ;
thread = Thread . currentThread ( ) ;
}
public boolean isReleasable ( ) {
if ( thread == null )
return true ;
if ( phaser. getPhase ( ) != phase) {
thread = null ;
return true ;
}
if ( Thread . interrupted ( ) )
wasInterrupted = true ;
if ( wasInterrupted && interruptible) {
thread = null ;
return true ;
}
if ( timed &&
( nanos <= 0L || ( nanos = deadline - System . nanoTime ( ) ) <= 0L ) ) {
thread = null ;
return true ;
}
return false ;
}
public boolean block ( ) {
while ( ! isReleasable ( ) ) {
if ( timed)
LockSupport . parkNanos ( this , nanos) ;
else
LockSupport . park ( this ) ;
}
return true ;
}
}
理解了arrive()和awaitAdvance(),arriveAndAwaitAdvance()就是二者的一个组合版本,这里就不说明了
Atomic类(Atomic:英文意思:原子的,所以接下来基本都是对某些数据的原子操作,比如后面学习的AtomicInteger类,一般代表操作int类型的原子操作,其也正好也是有private volatile int value;,value他刚好是int类型,所以即也基本是这样说明的):
AtomicInteger和AtomicLong :
如下面代码所示,对于一个整数的加减操作,要保证线程安全,需要加锁,也就是加synchronized关键字
public class MyClass {
private int count = 0 ;
public void synchronized increment ( ) {
count++ ;
}
public void synchronized decrement ( ) {
count-- ;
}
}
但有了Concurrent包的Atomic相关的类之后,synchronized关键字可以用AtomicInteger代替,其性能更 好,对应的代码变为:
public class MyClass {
private AtomicInteger count = new AtomicInteger ( 0 ) ;
public void add ( ) {
count. getAndIncrement ( ) ;
}
public long minus ( ) {
return count. getAndDecrement ( ) ;
}
}
package main5 ;
import java. util. concurrent. atomic. AtomicInteger ;
public class MyClass extends Thread {
private AtomicInteger count = new AtomicInteger ( 0 ) ;
int i = 0 ;
public void add ( ) {
i++ ;
count. getAndIncrement ( ) ;
}
public long minus ( ) {
i-- ;
return count. getAndDecrement ( ) ;
}
public static class add extends Thread {
MyClass m;
public add ( MyClass target) {
m = target;
}
@Override
public void run ( ) {
m. add ( ) ;
}
}
public static class add1 extends Thread {
MyClass m;
public add1 ( MyClass target) {
m = target;
}
@Override
public void run ( ) {
m. minus ( ) ;
}
}
public static void main ( String [ ] args) throws InterruptedException {
MyClass m = new MyClass ( ) ;
new add ( m) . start ( ) ;
new add1 ( m) . start ( ) ;
new add ( m) . start ( ) ;
new add1 ( m) . start ( ) ;
new add ( m) . start ( ) ;
new add1 ( m) . start ( ) ;
new add ( m) . start ( ) ;
new add1 ( m) . start ( ) ;
new add ( m) . start ( ) ;
new add1 ( m) . start ( ) ;
Thread . sleep ( 5000 ) ;
System . out. println ( m. i) ;
System . out. println ( m. count. get ( ) ) ;
}
}
具体为什么这样可以实现前面的方式且不加锁,我们看如下,其对应的源码如下:
public final int getAndIncrement ( ) {
return U . getAndAddInt ( this , VALUE , 1 ) ;
}
public final int getAndDecrement ( ) {
return U . getAndAddInt ( this , VALUE , - 1 ) ;
}
上面中的U是Unsafe的对象:
private static final jdk. internal. misc. Unsafe U = jdk. internal. misc. Unsafe. getUnsafe ( ) ;
private static final long VALUE = U . objectFieldOffset ( AtomicInteger . class , "value" ) ;
private volatile int value;
AtomicInteger的 getAndIncrement() 方法和 getAndDecrement() 方法都调用了一个方法,即U.getAndAddInt(…) 方法,该方法基于CAS实现:
@HotSpotIntrinsicCandidate
public final int getAndAddInt ( Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile ( o, offset) ;
} while ( ! weakCompareAndSetInt ( o, offset, v, v + delta) ) ;
return v;
}
do-while循环直到判断条件返回true为⽌(因为有个!),该操作就是之前有时候提到的自旋(一般称为自旋,当然,他里面或者条件通常会有阻塞时间的,比如类似于sleep的操作,即并不会始终的运行使得浪费资源)
getAndAddInt 方法具有volatile的语义(因为注解的存在),也就是对所有线程都是同时可⻅的(对应的操作可见)
而 weakCompareAndSetInt 方法的实现:
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt ( Object o, long offset,
int expected,
int x) {
return compareAndSetInt ( o, offset, expected, x) ;
}
调用了 compareAndSetInt 方法,该方法的实现:
public final class Unsafe {
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt ( Object o, long offset,
int expected,
int x) ;
}
package java. lang. invoke ;
public abstract class VarHandle {
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet ( Object . . . args) ;
}
实际上我们说明的源码并不重要,重要的是其中的设计思想,因为源码是非常多的,要将所有的类都进行学习是不现实的,所以重要的是思想
悲观锁与乐观锁:
对于悲观锁,认为数据发⽣并发冲突的概率很大,读操作之前就上锁,通常比如加上synchronized关键字,后面要讲的ReentrantLock都是悲观锁的典型
对于乐观锁,认为数据发⽣并发冲突的概率比较小,读操作之前不上锁,等到写操作的时候,再判断数据在此 期间是否被其他线程修改了,如果被其他线程修改了,就把数据重新读出来,重复该过程,如果没有被修改,就写 回去,判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS ( Compare And Set )
AtomicInteger的实现就是典型的乐观锁
我们可以看到AtomicInteger的实现就是利用Unsafe,现在我们来看看Unsafe这个类
Unsafe 的CAS详解(注意是Unsafe的CAS,因为CAS有多种实现方式,比如前面的VarHandle,但本质也是一样的,比如在77章博客提到的乐观锁):
我们知道,在前面多次的使用了CAS或者其相关的AQS,现在我们来说明一下CAS
Unsafe类是整个Concurrent包的基础,里面所有方法都是native的,如具体到上面提到的compareAndSetInt方 法,即:
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt ( Object o, long offset,
int expected,
int x) ;
要特别说明一下第二个参数,它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量在对应的 类中的内存偏移量(该变量在内存中的位置),表示该成员变量本身(或者说,将对应的成员变量值取出来)
第二个参数的值为AtomicInteger中的属性VALUE:
public final int getAndDecrement ( ) {
return U . getAndAddInt ( this , VALUE , - 1 ) ;
}
private static final long VALUE = U . objectFieldOffset ( AtomicInteger . class , "value" ) ;
private static final jdk. internal. misc. Unsafe U = jdk. internal. misc. Unsafe. getUnsafe ( ) ;
package jdk. internal. misc ;
import jdk. internal. HotSpotIntrinsicCandidate ;
import jdk. internal. vm. annotation. ForceInline ;
import java. lang. reflect. Field ;
import java. security. ProtectionDomain ;
public final class Unsafe {
}
而Unsafe的 objectFieldOffset(…) 方法调用,就是为了找到AtomicInteger类中value属性所在的内存偏 移量(或者说其位置,使得可以获取值或者设置值)
objectFieldOffset 方法的实现:
public long objectFieldOffset ( Field f) {
if ( f == null ) {
throw new NullPointerException ( ) ;
}
return objectFieldOffset0 ( f) ;
}
public long objectFieldOffset ( Class < ? > c, String name) {
if ( c == null || name == null ) {
throw new NullPointerException ( ) ;
}
return objectFieldOffset1 ( c, name) ;
}
其中objectFieldOffset1的实现为:
private native long objectFieldOffset0 ( Field f) ;
private native long objectFieldOffset1 ( Class < ? > c, String name) ;
很明显,他们都是native,即其他语言(一般是非java代码的)的操作
这里的Unsafe所有调用CAS的地方,基本都会先通过这个方法把成员变量转换成一个Offset(即VALUE,他也正好是long类型,因为对应的返回值也是long类型),再次以AtomicInteger为例:
private static final jdk. internal. misc. Unsafe U = jdk. internal. misc. Unsafe. getUnsafe ( ) ;
private static final long VALUE = U . objectFieldOffset ( AtomicInteger . class , "value" ) ;
从上面代码可以看到,无论是Unsafe对应的对象(自带的),还是VALUE,都是静态的,也就是类级别的,所有对象共用的,此处的VALUE实际上就代表了value变量本身,后面执行CAS操作的时候,不是直接操作value,而是操作VALUE,使得间接的操作value的获取以及修改,因为我们的value是不能直接的给其他静态资源(类,方法等等)修改的,所以利用其他静态资源进行操作获取或者修改(设置)值,虽然大多数并不是静态的或者方法没有静态(一般只有三个变量是静态的),但也是一种加强保护(不只是防止别人,也防止自身了,使得对应的一些不能进行操作或者不能直接的进行操作)
private static final long serialVersionUID = 6214790243416807050L ;
private static final jdk. internal. misc. Unsafe U = jdk. internal. misc. Unsafe. getUnsafe ( ) ;
private static final long VALUE = U . objectFieldOffset ( AtomicInteger . class , "value" ) ;
private volatile int value;
自旋与阻塞 :
当一个线程拿不到锁的时候,有以下两种基本的等待策略:
策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度
策略2:不放弃CPU,空转,不断重试,也就是所谓的"自旋"(一般我们也会设置自旋的时间,就如sleep设置的时间一样,当然也可以设置时间到了,也可以使得操作策略1,通常我们都会设置,比如wait虽然没有操作自旋,但是可以认为是时间非常少的自旋,然后时间到了操作策略1(虽然并不是))
很显然,如果是单核的CPU,只能用策略1,因为如果不放弃CPU,那么其他线程无法运行(就如释放锁,释放锁一般实际上就是释放cpu,因这里给出一个容易理解的方式,只有你在运行,那么cpu就是占用的,否则单纯的阻塞,或者说没有运行,那么cpu就是释放,因为运行是需要cpu的,那么sleep占用cpu吗,答:是占用的,他可以认为是自旋,当时间到了就不阻塞了,而wait可以认为没有占用,即可以认为是策略1),也就无法释放 锁,但对于多CPU或者多核,策略2就很有用了,因为没有线程切换的开销,所以一般多核会选择使用第二种,一般现在的电脑都是多核,所以以策略2为主,所以你也基本上测试不出来单核cpu的情况,因为这是硬件的问题,你也通常改变不了,那么这里就可以得出一个结论,线程切换(阻塞到唤醒之间的切换,导致线程切换,简称阻塞唤醒切换)的开销大于始终占用cpu的开销(也就是无限循环),所以一般自旋性能大于单纯的阻塞唤醒切换(阻塞唤醒切换:一种操作,说明他是可以被唤醒或者阻塞的,或者说被唤醒),当然,这是对于对应的线程数竞争不激烈的情况下说明的,因为线程多点,cpu还有,那么若阻塞和唤醒就会操作多,即不好,但是若非常多的话,那么阻塞和唤醒要好了,因为自旋是始终占用cpu的,那么cpu需要等了,所以若是少的,那么自旋自然可能会比阻塞切换的效率高,即就是是少的自旋性能通常也是好的,因为他是始终的重试的,你可能说,即再一定的线程下,那么自旋好了(因为重试的存在,使得我单纯的不用切换,也就是切换变成了重试,所以开销少,效率好,即运行效率好,即更快的执行(获取锁)完毕),但是如果你一直重试,并且认为他永不切换,或者线程过多,就阻塞唤醒好,所以通常自旋会定义一个临界值(synchronized一般比较大),认为你没有作用或者到自旋一定次数,还是操作阻塞唤醒切换这些操作吧,因为无限制的重试或者线程过多,自然比单纯的直接阻塞唤醒要开销大
AtomicInteger的实现就用的是"自旋"策略(或者说对应的Unsafe的CAS就是这样的策略,CAS为什么会操作一个人,是因为CAS其当有人操作成功后,其他的人基本都会失败,而这种失败,我们也称为CAS的没有获取锁的意思,这个失败的意思在后面会提到的),如果拿不到锁,就会一直重试
注意:以上两种策略并不互斥,可以结合使用,如果获取不到锁,先自旋,如果自旋还拿不到锁,再阻塞(策略1),synchronized关键字就是这样的实现策略(一般他实际上他也操作了自旋拿不到锁才阻塞,会自旋一会时间,这个时间通常比较大,一般来说synchronized释放锁后也会操作唤醒,一般是全部唤醒,因为是不公平的,公平的可能只会唤醒前一个,或者也全部唤醒,但是因为顺序的原因,导致可能继续阻塞的,若第一个快速的释放,那么可能他会在阻塞前又操作了唤醒,所以如果是后面一种情况,一般我们都会自旋一下,当然这基本作用不大,主要是需要在选择之后,主动等待对方都阻塞才往后面操作,当然主要是"公平的可能只会唤醒前一个"), 除了AtomicInteger是这样外,AtomicLong也是同样的原理,所以就不多说了,一般现在的锁都会操作自旋,并且留有一定的时间准备操作策略1,通常原语操作基本都是策略1的,比如wait,也可能操作策略2,通常lock和synchronized基本操作策略2
综上所述,我们将CAS称为锁的作用也行,在前面我们通常也是这样认为的(因为自旋的存在),所以在前面若有认为CAS也是锁,那么也是正确的,因为一般来说,CAS都需要操作自旋,来保证锁的阻塞作用(前面说明的源码相关CAS基本都操作了自旋),而由于CAS一般利用版本号思想(77章博客有说明,而CAS的版本号只是利用其他语言实现的而已,因为数据库可以实现,那么自然其他语言也能实现,即操作了native,这个其他语言通常是C/C++,可能还有其他的语言操作,具体要看对应的配置给该方法的指向语言的操作,通常是C/C++而已,极少或者没有其他的除了C/C++的语言操作了),且对应的设置通常是唯一的,那么自旋自然可以操作,也就能认为可以阻塞了(策略1),这也是之前说明的CAS可以阻塞的原因(而我们一般将这种有阻塞CAS的操作称为AQS)
那么为什么自旋要进行操作呢,这是因为始终的无限循环对内存不好,需要有个结果才是好的,所以需要进行操作
AtomicBoolean和AtomicReference :
为什么需要AtomicBoolean:
对于int或者long型变量,需要进行加减操作(被多次的操作),所以要加锁,因为对于boolean来说,int或者long进行操作的多(特别是计算,有多个该两个类型),所以通常需要加锁,但对于一个boolean类型来说,true或false的赋值和取值操作,基本上是非常少的,通常只需要加上volatile关键字就够了,为什么还需要AtomicBoolean呢:
这是因为往往要实现下面这种功能(或者说需要加锁的功能,因为虽然通常只需要加上volatile即可,但是有些情况还是需要加锁的):
if ( ! flag) {
flag = true ;
}
if ( flag == false ) {
flag = true ;
}
上面总体来说,也就是要实现 compare(比较)和set(赋值)两个操作合在一起的原子性,而这也正是CAS自带提供的功能,所以对应的AtomicBoolean基本也就是使用CAS操作,即上面的代码,就可以变 成:
if ( compareAndSet ( false , true ) ) {
}
public class AtomicBoolean implements java. io. Serializable {
private static final VarHandle VALUE ;
public final boolean compareAndSet ( boolean expectedValue, boolean newValue) {
return VALUE . compareAndSet ( this ,
( expectedValue ? 1 : 0 ) ,
( newValue ? 1 : 0 ) ) ;
}
}
同样地,AtomicReference也需要同样的功能(一般我们主要说明AtomicBoolean),对应的方法如下:
public class AtomicReference < V > implements java. io. Serializable {
private static final VarHandle VALUE ;
public final boolean compareAndSet ( V expectedValue, V newValue) {
return VALUE . compareAndSet ( this , expectedValue, newValue) ;
}
}
对应的VarHandle的compareAndSet在前面说明过,这里再次的提一下(给出):
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet ( Object . . . args) ;
其中,以AtomicBoolean为例,expect是旧的引用,update为新的引用,其他的基本都是如此(因为是Object,所以是引用)
上面两种可以看到是利用了VarHandle来使得支持CAS,并且支持boolean的类型(实际上由于是引用,所以基本上可以支持任意类型),所以我们只看看Unsafe类如何⽀持boolean和double类型:
在Unsafe类中,也提供了三种类型的CAS操作:int、long、Object(也就是引用类型),如下所示:
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt ( Object o, long offset,
int expected,
int x) ;
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetLong ( Object o, long offset,
long expected,
long x) ;
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetObject ( Object o, long offset,
Object expected,
Object x) ;
即,在jdk的实现中,这三种CAS操作都是由底层实现的,其他类型的CAS操作都要转换为这三种之一进行操 作
比如对应的支持的boolean和double类型:
@ForceInline
public final boolean compareAndSetDouble ( Object o, long offset,
double expected,
double x) {
return compareAndSetLong ( o, offset,
Double . doubleToRawLongBits ( expected) ,
Double . doubleToRawLongBits ( x) ) ;
}
@ForceInline
public final boolean compareAndSetBoolean ( Object o, long offset,
boolean expected,
boolean x) {
return compareAndSetByte ( o, offset, bool2byte ( expected) , bool2byte ( x) ) ;
}
@HotSpotIntrinsicCandidate
public final boolean compareAndSetByte ( Object o, long offset,
byte expected,
byte x) {
return compareAndExchangeByte ( o, offset, expected, x) == expected;
}
@HotSpotIntrinsicCandidate
public final byte compareAndExchangeByte ( Object o, long offset,
byte expected,
byte x) {
long wordOffset = offset & ~ 3 ;
int shift = ( int ) ( offset & 3 ) << 3 ;
if ( BE ) {
shift = 24 - shift;
}
int mask = 0xFF << shift;
int maskedExpected = ( expected & 0xFF ) << shift;
int maskedX = ( x & 0xFF ) << shift;
int fullWord;
do {
fullWord = getIntVolatile ( o, wordOffset) ;
if ( ( fullWord & mask) != maskedExpected)
return ( byte ) ( ( fullWord & mask) >> shift) ;
} while ( ! weakCompareAndSetInt ( o, wordOffset,
fullWord, ( fullWord & ~ mask) | maskedX) ) ;
return expected;
}
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt ( Object o, long offset,
int expected,
int x) {
return compareAndSetInt ( o, offset, expected, x) ;
}
对应这三种CAS操作中其中的参数(以compareAndSetInt为例,其他的基本都是一样的):
1:第一个参数是要修改的对象
2:第二个参数是对象的成员变量(value)在内存中的位置(一个long型的整数),前面说明过了
3:第三个参数是该变量(是该成员变量)的旧值
4:第四个参数是该变量的新值
AtomicBoolean类型如何⽀持CAS(即对应的类似的compareAndSet操作,虽然前面提到过了)?
对于用int型来代替的,在入参的时候,将boolean类型转换成int类型,在返回值的时候,将int类型转换成boolean类型,如下所示:
public final boolean compareAndSet ( boolean expectedValue, boolean newValue) {
return VALUE . compareAndSet ( this ,
( expectedValue ? 1 : 0 ) ,
( newValue ? 1 : 0 ) ) ;
}
如果是double类型,Unsafe类又如何⽀持呢?
这依赖double类型提供的一对double类型和long类型互转的方法:
public final class Double extends Number implements Comparable < Double > {
@HotSpotIntrinsicCandidate
public static native long doubleToRawLongBits ( double value) ;
@HotSpotIntrinsicCandidate
public static native double longBitsToDouble ( long bits) ;
}
而在Unsafe类中的方法实现:
@ForceInline
public final boolean compareAndSetDouble ( Object o, long offset,
double expected,
double x) {
return compareAndSetLong ( o, offset,
Double . doubleToRawLongBits ( expected) ,
Double . doubleToRawLongBits ( x) ) ;
}
即对应都是支持的(比如int,long,boolean,double等等,其他就不多说了,由于VarHandle是引用,所以通常都能支持,所以上面只是说明Unsafe的情况),其中对应的类基本都是因为Unsafe类或者VarHandle类使得可以支持CAS
具体Unsafe比如其他的情况:
@ForceInline
public final boolean compareAndSetFloat ( Object o, long offset,
float expected,
float x) {
return compareAndSetInt ( o, offset,
Float . floatToRawIntBits ( expected) ,
Float . floatToRawIntBits ( x) ) ;
}
其他情况看源码吧,这里就不依次列出来了,但是基本上都也是支持的,因为其也包括Object的方法:
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetObject ( Object o, long offset,
Object expected,
Object x) ;
AtomicStampedReference和AtomicMarkableReference :
ABA问题与解决办法:
到⽬前为⽌,CAS都是基于"值"来做比较的,但如果另外一个线程把变量的值从A改为B,再从B改回到A,那么 尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是 所谓的ABA问题,特别的再某些时候,可能会重复的进行操作,造成问题,比如如果还是A那么进行减减等等(虽然可以加锁,但是CAS是不操作锁的,即这里是乐观锁的问题)
举例来说:
小张⽋小李100块,约定今天还,给打到银行卡
小李的银行卡余额是0,打过来之后应该是100块
小张今天还钱这个事小李知道,小李还告诉了自己媳妇,小张还钱了,并且小李媳妇看到了,马上就取出来花掉了
然后小李恰好在他媳妇取出之后检查账户,一看余额还是0,然后找小张要账(即被还钱,和被媳妇花掉是一瞬间的,还没有来得及看余额)
这其中,小李的账户余额从0到100,再从100到0,小李一开始检查是0,第二次检查还是0,就认为小张没 还钱
实际上小李媳妇花掉了,这就是ABA问题,及A到B再到A,其实小李可以查看账户的收⽀记录
所以我们要解决ABA问题,就需要解决上面说的查看账户的收支记录,所以,不仅要比较"值",还要比较"版本号"(77章博客那里就是这样的操作,因为ABA问题是乐观锁的问题,所以也属于乐观锁),也就还要比较是否是原来的那条记录(因为值可以一样,但是可能并不是原来的那个操作了)
而这正是 AtomicStampedReference(一般AtomicReference和AtomicStampedReference都是操作引用,也就是除了基本的对象,如Integer外的引用对象,或者说,加上了Reference相关的,基本都是操作这样的引用,代表操作对象,比如我们创建的对象,当然实际上也包括基本的对象,所以说,他基本能够操作所有的类型,因为他是泛型)做的事情,其对应的CAS方法如下:
public boolean compareAndSet ( V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair < V > current = pair;
return
expectedReference == current. reference &&
expectedStamp == current. stamp &&
( ( newReference == current. reference &&
newStamp == current. stamp) ||
casPair ( current, Pair . of ( newReference, newStamp) ) ) ;
}
之前的 CAS一般只有两个重要参数,这里的 CAS一般有四个重要参数,后两个参数就是版本号的旧值和新值,当expectedReference != 对象当前的reference时,说明该数据肯定被其他线程修改过,也就会返回false(因为&&),当expectedReference == 对象当前的reference时,说明没有被修改,但不确定是否是原来的记录了,所以那么再进一步比较expectedStamp是否等于对象当前的版本 号,继续的以此判断数据是否被其他线程修改过,而只要修改过,那么自然返回false,使得不进行操作,因为我的是旧数据,CAS就是这样操作的,即只能一人操作成功,当然虽然他们失败,但是可以进行阻塞或者说自旋等等操作,所以失败实际上就是判断是否操作过的意思(这里解释前面的"而这种失败,我们也称为CAS的没有获取锁的意思,这个失败的意思在后面会提到的"这个地方)
一般来说VarHandle只会操作三个参数,而Unsafe一般是2个或者4个参数,如果发现compareAndSet或者xxxcompareAndSetxxx的操作,那么通常说明的是VarHandle或者Unsafe(但是并不绝对,比如AbstractQueuedSynchronizer里面的compareAndSetState也是操作compareAndSet,所以对应的xxx应该代表具体的Unsafe操作的类型名称,通常固定,而不是所有的xxx未知数,比如前面出现的weakCompareAndSetInt,他里面就操作了compareAndSetInt,至此说明完毕),如果对应的参数不对,通常他并不是执行对应的方法,他对应方法里面通常才会执行,比如之前的"head.compareAndSet(q, q.next)"(可以全局搜索,使用ctrl+f,然后复制粘贴即可找到)实际上他里面还有其他的操作,最终使得操作完毕(一般是三个参数),从上面可以看出,那么AtomicStampedReference对应的操作是使用VarHandle的(因为是compareAndSet),可以选择自己看一下上面的casPair方法即可
private boolean casPair ( Pair < V > cmp, Pair < V > val) {
return PAIR . compareAndSet ( this , cmp, val) ;
}
private static final VarHandle PAIR ;
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet ( Object . . . args) ;
发现也的确如此
为什么没有AtomicStampedInteger或AtomictStampedLong:
我们要解决Integer或者Long型变量的ABA问题,但是为什么只有AtomicStampedReference,而没有AtomicStampedInteger或者AtomictStampedLong呢:
因为这里要同时比较数据的"值"和"版本号",而Integer型或者Long型的CAS没有办法同时比较两个变量(他们优先只是考虑值的,而不是具体引用),于是为了进行解决,所以实际上AtomicStampedReference会有其他的操作进行解决,即操作把值和版本号封装成一个对象,也就是其里面的Pair内部类,然后通过对象引用的CAS来实现,代码 如下所示:
public class AtomicStampedReference < V > {
private static class Pair < T > {
final T reference;
final int stamp;
private Pair ( T reference, int stamp) {
this . reference = reference;
this . stamp = stamp;
}
static < T > Pair < T > of ( T reference, int stamp) {
return new Pair < T > ( reference, stamp) ;
}
}
private static final VarHandle PAIR ;
static {
try {
MethodHandles. Lookup l = MethodHandles . lookup ( ) ;
PAIR = l. findVarHandle ( AtomicStampedReference . class , "pair" ,
Pair . class ) ;
} catch ( ReflectiveOperationException e) {
throw new ExceptionInInitializerError ( e) ;
}
}
private boolean casPair ( Pair < V > cmp, Pair < V > val) {
return PAIR . compareAndSet ( this , cmp, val) ;
}
}
当使用的时候,在构造方法里面传入值和版本号两个参数,应用程序对版本号进行累加操作,然后调用上面的CAS进行总体设置(具体可以参照77章博客),如下所示:
public AtomicStampedReference ( V initialRef, int initialStamp) {
pair = Pair . of ( initialRef, initialStamp) ;
}
AtomicMarkableReference:
AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是boolean类型 的,而不是整型的累加变量,如下所示:
public class AtomicMarkableReference < V > {
private static class Pair < T > {
final T reference;
final boolean mark;
private Pair ( T reference, boolean mark) {
this . reference = reference;
this . mark = mark;
}
static < T > Pair < T > of ( T reference, boolean mark) {
return new Pair < T > ( reference, mark) ;
}
}
}
因为是boolean类型,只能有true、false 两个版本号,所以并不能完全避免ABA问题,只是降低了ABA发⽣的 概率,因为可能当你修改后,其他多个线程或多次的修改,使得版本号还是会变回来,那么自然就没有避免ABA问题了,即这里的版本号会回来的,而不是累加的不会回来,所以通常只是针对于两个线程交替来操作的,或者少量的线程,但也建议不用使用,因为可能一个线程可以多次的进行操作
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater:
为什么需要AtomicXXXFieldUpdater:
如果一个类是自己编写的,则可以在编写的时候把成员变量定义或者实现操作Atomic(原子)类型,但如果是一个已经有的类,在 不能更改其源代码的情况下,要想实现对其成员变量的原子操作,通常就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater来在外部进行操作,当然我们也可以自己进行操作,只是他给出了方便的类,那么自然我们可以直接使用即可
通过AtomicIntegerFieldUpdater理解它们的实现原理
AtomicIntegerFieldUpdater是一个抽象类
⾸先,其构造方法是protected,不能直接构造其对象,必须通过它提供的一个静态方法来创建,如下所示:
public abstract class AtomicIntegerFieldUpdater < T > {
protected AtomicIntegerFieldUpdater ( ) {
}
}
方法 newUpdater 用于创建AtomicIntegerFieldUpdater类对象:
@CallerSensitive
public static < U > AtomicIntegerFieldUpdater < U > newUpdater ( Class < U > tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl < U >
( tclass, fieldName, Reflection . getCallerClass ( ) ) ;
}
newUpdater(…)静态方法传入的是要修改的类(不是对象)和对应的成员变量的名字,内部通过反射拿到这个 类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象,所以,这个对象表示的是类的某个成员,而不 是该对象的成员变量,因为我们是操作类的,而不是单只对象(因为类的对象可以有多个),即只要操作该成员,然后传递对应的对象,那么都会操作(一个类只能有一个Class)
即若要修改某个对象的成员变量的值,一般需要再传入相应的对象,如下所示:
public int getAndIncrement ( T obj) {
int prev, next;
do {
prev = get ( obj) ;
next = prev + 1 ;
} while ( ! compareAndSet ( obj, prev, next) ) ;
return prev;
}
public final int getAndIncrement ( T obj) {
return getAndAdd ( obj, 1 ) ;
}
public final int getAndAdd ( T obj, int delta) {
accessCheck ( obj) ;
return U . getAndAddInt ( obj, offset, delta) ;
}
public abstract class AtomicIntegerFieldUpdater < T > {
private static final class AtomicIntegerFieldUpdaterImpl < T >
extends AtomicIntegerFieldUpdater < T > {
private static final Unsafe U = Unsafe . getUnsafe ( ) ;
private final long offset;
private final Class < ? > cclass;
private final void accessCheck ( T obj) {
if ( ! cclass. isInstance ( obj) )
throwAccessCheckException ( obj) ;
}
public final boolean compareAndSet ( T obj, int expect, int update) {
accessCheck ( obj) ;
return U . compareAndSetInt ( obj, offset, expect, update) ;
}
}
}
accecssCheck方法的作用是检查该obj是不是对应的class类型,如果不是,则拒绝修改(CAS操作,即也是服务了多个线程),抛出异常,从代码可以看到,其 CAS 原理和 AtomicInteger 是一样的,底层都调用了 Unsafe 的compareAndSetInt(…)方法
限制条件:
要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装 类),该限制从其构造方法中可以看到:
AtomicIntegerFieldUpdaterImpl ( final Class < T > tclass,
final String fieldName,
final Class < ? > caller) {
final Field field;
final int modifiers;
try {
field = AccessController . doPrivileged (
new PrivilegedExceptionAction < Field > ( ) {
public Field run ( ) throws NoSuchFieldException {
return tclass. getDeclaredField ( fieldName) ;
}
} ) ;
modifiers = field. getModifiers ( ) ;
sun. reflect. misc. ReflectUtil. ensureMemberAccess (
caller, tclass, null , modifiers) ;
ClassLoader cl = tclass. getClassLoader ( ) ;
ClassLoader ccl = caller. getClassLoader ( ) ;
if ( ( ccl != null ) && ( ccl != cl) &&
( ( cl == null ) || ! isAncestor ( cl, ccl) ) ) {
sun. reflect. misc. ReflectUtil. checkPackageAccess ( tclass) ;
}
} catch ( PrivilegedActionException pae) {
throw new RuntimeException ( pae. getException ( ) ) ;
} catch ( Exception ex) {
throw new RuntimeException ( ex) ;
}
if ( field. getType ( ) != int . class )
throw new IllegalArgumentException ( "Must be integer type" ) ;
if ( ! Modifier . isVolatile ( modifiers) )
throw new IllegalArgumentException ( "Must be volatile type" ) ;
this . cclass = ( Modifier . isProtected ( modifiers) &&
tclass. isAssignableFrom ( caller) &&
! isSamePackage ( tclass, caller) )
? caller : tclass;
this . tclass = tclass;
this . offset = U . objectFieldOffset ( field) ;
}
⾄于 AtomicLongFieldUpdater、AtomicReferenceFieldUpdater,也有类似的限制条件,其底层的CAS原 理,也和对应的AtomicLong、AtomicReference是一样的,这样我们就不多说了(以后看到这样的说明,那么代表类似,即我不会进行多说明)
AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray :
Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子 操作,注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言
使用方式 :
以AtomicIntegerArray为例,其使用方式如下:
public final int getAndIncrement ( int i) {
return ( int ) AA . getAndAdd ( array, i, 1 ) ;
}
public class AtomicIntegerArray implements java. io. Serializable {
private static final VarHandle AA
= MethodHandles . arrayElementVarHandle ( int [ ] . class ) ;
private final int [ ] array;
}
相比于AtomicInteger的getAndIncrement()方法,这里只是一个传入参数:数组的下标i(而不是偏移量),当然这是因为对应的CAS是不同的,因为其是Unsafe,其一般操作偏移量,而这里操作VarHandle(具体为什么Unsafe一般操作偏移量,在前面说明过了,即这个"所以需要进行查找,然后找到后比较"地方)
其他方法也与此类似,相比于 AtomicInteger 的各种加减方法,也都是对应的一个下标 i,如下所示
public final int getAndDecrement ( int i) {
return ( int ) AA . getAndAdd ( array, i, - 1 ) ;
}
public final int getAndSet ( int i, int newValue) {
return ( int ) AA . getAndSet ( array, i, newValue) ;
}
public final boolean compareAndSet ( int i, int expectedValue, int newValue) {
return AA . compareAndSet ( array, i, expectedValue, newValue) ;
}
public final int getAndIncrement ( ) {
return U . getAndAddInt ( this , VALUE , 1 ) ;
}
public final int getAndDecrement ( ) {
return U . getAndAddInt ( this , VALUE , - 1 ) ;
}
public final boolean compareAndSet ( int expectedValue, int newValue) {
return U . compareAndSetInt ( this , VALUE , expectedValue, newValue) ;
}
实现原理:
其底层的CAS方法直接调用VarHandle中native的getAndAdd方法,如下所示:
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
Object getAndAdd ( Object . . . args) ;
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet ( Object . . . args) ;
明⽩了AtomicIntegerArray的实现原理,另外两个数组的原子类实现原理与之类似,这样就不多说了
Striped64与LongAdder:
从JDK 8开始,针对Long型的原子操作(前面大致说明了AtomicLong),Java又提供了LongAdder、LongAccumulator,针对Double类型,Java提供了DoubleAdder、DoubleAccumulator,Striped64相关的类的继承层次如下图所示
没有颜色的(白色)的箭头代表是其(指向的)子类
abstract class Striped64 extends Number {
}
public class LongAdder extends Striped64 implements Serializable {
}
public class LongAccumulator extends Striped64 implements Serializable {
}
public class DoubleAdder extends Striped64 implements Serializable {
}
public class DoubleAccumulator extends Striped64 implements Serializable {
}
LongAdder原理 :
LongAdder内部最终操作的是一个volatile long型变量(与AtomicLong基本是一样的,即base变量和其他Cell对应的变量的和,主要默认base变量,因为一开始定义的是long sum = base;),由多个线程对这个变量进行CAS操作,多个线程同时对一个变量 进行CAS操作,在高并发的场景下仍不够快(因为只能操作一个,并且他是原子的,所以会慢点),如果再要提高性能,该怎么做呢?
把一个变量拆成多份,变为多个变量(也能解决单个变量的存储上限的问题,当然,一般超过上限,通常会操作特别的操作),有些类似于 ConcurrentHashMap 的分段锁(每个头节点分别加锁)的例子,如下图所示,把一 个Long型拆成一个base变量外加多个Cell,每个Cell包装了一个Long型变量,当多个线程并发累加的时候,如果并 发度低,就直接加到base变量上,如果并发度高,冲突大,平摊到这些Cell上(具体平摊看当时的算法,你可以认为是平均的加上,即除以4等等,当然,这是不好的,因为有精度的问题,所以通常是依次的给予,即第一个操作获取数据,那么到第二个,那么很明显,从下图中可以知道第四个之后就是第一个了,而这样,他们的数据基本是不会一样的,因为是依次,而依次的参数的值通常不同,所以基本不会是一样的),在最后取值的时候,再把base和这 些Cell求sum运算
以LongAdder的sum()方法为例,如下所示:
public long sum ( ) {
Cell [ ] cs = cells;
long sum = base;
if ( cs != null ) {
for ( Cell c : cs)
if ( c != null )
sum += c. value;
}
return sum;
}
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
}
由于无论是long,还是double,都是64位的,但因为没有double型的CAS操作,所以是通过把double型转化 成long型来实现的,所以,上面的base和cell[]变量,是位于基类Striped64当中的,英文Striped意为"条带",也就 是分⽚
abstract class Striped64 extends Number {
transient volatile Cell [ ] cells;
transient volatile long base;
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
Cell ( long x) { value = x; }
}
Striped64 ( ) {
}
}
最终一致性 (或者说弱一致性):
在sum求和方法中,并没有对cells[]数组加锁,也就是说,一边有线程对其执行求和操作,一边还有线程修改 数组里的值,也就是最终一致性(在某些集群中一般代表会获取旧值,但是他们的读取和更新并不会使得破坏数据,只是读取的可能是旧值而已,当然,他们是不同的副本的,所以没有操作破坏数据,因为单个副本的更新的原子的,就如mysql一样,有写锁,在83章博客也说明过"最终一致性的问题"),而不是强一致性,这也类似于ConcurrentHashMap 中的 clear()方法,一边执行 清空操作,一边还有线程放入数据,clear()方法调用完毕后再读取,hash map里面可能还有元素,因此,LongAdder适合高并发的统计场景,而不适合要对某个 Long 型变量进行严格同步的场景
这里之所以也说最终一致性
伪共享与缓存行填充:
在Cell类的定义中,用了一个独特的注解@jdk.internal.vm.annotation.Contended(有些版本可能是sun.misc.Contended),这是JDK 8之后才有的,背后涉及一个很重 要的优化原理:伪共享与缓存行填充
@jdk.internal.vm.annotation.Contended static final class Cell {
}
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
Cell ( long x) { value = x; }
final boolean cas ( long cmp, long val) {
return VALUE . compareAndSet ( this , cmp, val) ;
}
final void reset ( ) {
VALUE . setVolatile ( this , 0L ) ;
}
final void reset ( long identity) {
VALUE . setVolatile ( this , identity) ;
}
final long getAndSet ( long val) {
return ( long ) VALUE . getAndSet ( this , val) ;
}
private static final VarHandle VALUE ;
static {
try {
MethodHandles. Lookup l = MethodHandles . lookup ( ) ;
VALUE = l. findVarHandle ( Cell . class , "value" , long . class ) ;
} catch ( ReflectiveOperationException e) {
throw new ExceptionInInitializerError ( e) ;
}
}
}
每个 CPU 都有自己的缓存(一般来说缓存代表保留的数据,比如当你重复的读取某个数据时,那么会第二次开始就直接的使用缓存的,而不用去找对应的地址,这样就可以使得提高效率,在前面也说明过"初始化后或者普通的打印,称为1",这个初始化就是缓存的利用,当然,我们还是操作本来的数据的,只是我们多出了缓存而已,注意:是先保留缓存,然后将缓存数据到主内存,前面也说明过了,即"即缓存是在写入内存之前的"),那么缓存与主内存进行数据交换的基本单位叫Cache Line(缓存行),在64位x86架 构中,缓存行是64字节,也就是8个Long型的大小,这也意味着当缓存失效(也称为更新,专业名词一般我们称为失效,代表原来的对应缓存直接没了,所以我们称为失效),要刷新到主内存的时候,最少要刷新64字节
如下图所示,主内存中有变量X、Y、Z(假设每个变量都是一个Long型),被CPU1(Core0)和CPU2(Core1)分别读入自己的缓 存,放在了同一行Cache Line里面,当CPU1修改了X变量,它要失效整行Cache Line(也就是说,对应的缓存要更新了,但是他确直接更新整个行,而不是对应的一个操作,即变量),然后也就是往总线上发消息,通 知CPU 2对应的Cache Line失效,由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的,Cache Line,这会导致Y、Z变量的缓存也失效
虽然只修改了X变量,本应该只失效(更新)X变量的缓存,但Y、Z变量也随之失效,Y、Z变量的数据没有修改,本应该 很好地被 CPU1 和 CPU2 共享(使得缓存存在,让效率变高),却没做到(那么效率会变低,因为要去内存找了),这就是所谓的"伪共享问题",注意:上面是没有解决的方式,所以你可能会感觉到不适应,看后面就好了
问题的原因是,Y、Z和X变量处在了同一行Cache Line里面,要解决这个问题,需要用到所谓的"缓存行填充", 分别在X、Y、Z后面加上7个无用的Long型,填充整个缓存行,让X、Y、Z处在三行不同的缓存行中,如下图所示:
声明一个@jdk.internal.vm.annotation.Contended即可实现缓存行的填充,之所以这个地方要用缓存行填 充,是为了不让Cell[]数组中相邻的元素落到同一个缓存行里,那么由于他们都各自不是同一个行,所以不会互相影响,但是这样需要更多的空间,即使用空间来提高效率(因为缓存没了,效率会变低的)
LongAdder核⼼实现:
上面基本都是优化的方式,现在回归正题,他是如何实现原子的,即对应的加和减操作的原子(因为我们是与对应的AtomicLong是基本一样的操作,而AtomicLong与AtomicInteger也说基本一样的操作)
所以下面来看LongAdder最核⼼的累加方法add(long x),自增、自减操作都是通过调用该方法实现的(因为我们可以操作正数和负数,使得操作自增和自减,并且他不是固定像AtomicLong或者AtomicInteger一样的操作1或者-1,而是自定义的):
public void add ( long x) {
Cell [ ] cs; long b, v; int m; Cell c;
if ( ( cs = cells) != null || ! casBase ( b = base, b + x) ) {
boolean uncontended = true ;
if ( cs == null || ( m = cs. length - 1 ) < 0 ||
( c = cs[ getProbe ( ) & m] ) == null ||
! ( uncontended = c. cas ( v = c. value, v + x) ) )
longAccumulate ( x, null , uncontended) ;
}
}
public void increment ( ) {
add ( 1L ) ;
}
public void decrement ( ) {
add ( - 1L ) ;
}
final boolean casBase ( long cmp, long val) {
return BASE . compareAndSet ( this , cmp, val) ;
}
private static final VarHandle BASE ;
final boolean cas ( long cmp, long val) {
return VALUE . compareAndSet ( this , cmp, val) ;
}
private static final VarHandle VALUE ;
当一个线程调用add(x)的时候,⾸先在对应数值为null下,会尝试使用casBase把x加到base变量上,如果不成功(因为!,所以不成功就为true,即往后执行),但是就不会继续执行另外一个CAS了,因为cs == null 为true,对于||来说,后面的不操作,若对于的不为null,那么第一次不会尝试,那么考虑再用 c.cas(…)方 法尝试把 x 加到 Cell 数组的某个元素上,如果还不成功,最后再调用longAccumulate(…)方法,所以对应的cell在null或者不为null下,都有对应的CAS操作,并且CAS只会有一个执行,而不会重复操作
注意:Cell[]数组的大小始终是2的整数次方,在运行中会不断扩容,每次扩容都是增长2倍,上面代码中的cs[getProbe() & m] 其实就是对数组的大小取模,因为m=cs.length–1,getProbe()为该线程⽣成一个随机数, 用该随机数对数组的长度取模,因为数组长度是2的整数次方,所以可以用&操作来优化取模运算,对于一个线程来说,它并不在意到底是把x累加到base上面,还是累加到Cell[]数组上面,只要累加成功就可 以,因此,这里使用随机数来实现Cell的长度取模
如果对应的尝试都是不成功的,通常则都会调用 longAccumulate(…)方法,该方法在 Striped64 里面,其中LongAccumulator也会 用到(LongAdder、LongAccumulator),如下所示:
final void longAccumulate ( long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ( ( h = getProbe ( ) ) == 0 ) {
ThreadLocalRandom . current ( ) ;
h = getProbe ( ) ;
wasUncontended = true ;
}
boolean collide = false ;
done: for ( ; ; ) {
Cell [ ] cs; Cell c; int n; long v;
if ( ( cs = cells) != null && ( n = cs. length) > 0 ) {
if ( ( c = cs[ ( n - 1 ) & h] ) == null ) {
if ( cellsBusy == 0 ) {
Cell r = new Cell ( x) ;
if ( cellsBusy == 0 && casCellsBusy ( ) ) {
try {
Cell [ ] rs; int m, j;
if ( ( rs = cells) != null &&
( m = rs. length) > 0 &&
rs[ j = ( m - 1 ) & h] == null ) {
rs[ j] = r;
break done;
}
} finally {
cellsBusy = 0 ;
}
continue ;
}
}
collide = false ;
}
else if ( ! wasUncontended)
wasUncontended = true ;
else if ( c. cas ( v = c. value,
( fn == null ) ? v + x : fn. applyAsLong ( v, x) ) )
break ;
else if ( n >= NCPU || cells != cs)
collide = false ;
else if ( ! collide)
collide = true ;
else if ( cellsBusy == 0 && casCellsBusy ( ) ) {
try {
if ( cells == cs)
cells = Arrays . copyOf ( cs, n << 1 ) ;
} finally {
cellsBusy = 0 ;
}
collide = false ;
continue ;
}
h = advanceProbe ( h) ;
}
else if ( cellsBusy == 0 && cells == cs && casCellsBusy ( ) ) {
try {
if ( cells == cs) {
Cell [ ] rs = new Cell [ 2 ] ;
rs[ h & 1 ] = new Cell ( x) ;
cells = rs;
break done;
}
} finally {
cellsBusy = 0 ;
}
}
else if ( casBase ( v = base,
( fn == null ) ? v + x : fn. applyAsLong ( v, x) ) )
break done;
}
}
由于自旋的存在(无限循环),使得失败时,进行阻塞,那么当对应操作完毕后,由于会解除阻塞,那么通常进行添加,那么也会操作扩容
面LongAccumulator:
LongAccumulator的原理和LongAdder类似,只是功能更强大,下面为两者构造方法的对比:
public LongAccumulator ( LongBinaryOperator accumulatorFunction,
long identity) {
this . function = accumulatorFunction;
base = this . identity = identity;
}
LongAdder只能进行累加操作,并且初始值默认为0,LongAccumulator可以自己定义一个二元操作符,并且 可以传入一个初始值
@FunctionalInterface
public interface LongBinaryOperator {
long applyAsLong ( long left, long right) ;
}
操作符的左值,就是base变量或者Cells[]中元素的当前值,右值,就是add()方法传入的参数x,即可以选择一个进行操作,
下面是LongAccumulator的accumulate(x)方法,与LongAdder的add(x)方法类似,最后都是调用的Striped64的LongAccumulate(…)方法
public void accumulate ( long x) {
Cell [ ] cs; long b, v, r; int m; Cell c;
if ( ( cs = cells) != null
|| ( ( r = function. applyAsLong ( b = base, x) ) != b
&& ! casBase ( b, r) ) ) {
boolean uncontended = true ;
if ( cs == null
|| ( m = cs. length - 1 ) < 0
|| ( c = cs[ getProbe ( ) & m] ) == null
|| ! ( uncontended =
( r = function. applyAsLong ( v = c. value, x) ) == v
|| c. cas ( v, r) ) )
longAccumulate ( x, function, uncontended) ;
}
}
唯一的差别就是LongAdder的add(x)方法调用的是casBase(b, b+x),这里调用的是casBase(b, r),其中,r=function.applyAsLong(b=base, x),也就是自己再次的操作修改值,可以用来满足某些条件,比如什么值不能设置等等,比如不能是100
DoubleAdder与DoubleAccumulator:
DoubleAdder 其实也是用 long 型实现的,因为没有 double 类型的 CAS 方法,下面是DoubleAdder的add(x)方法,和LongAdder的add(x)方法基本一样,只是多了long和double类型的相互转换
public void add ( double x) {
Cell [ ] cs; long b, v; int m; Cell c;
if ( ( cs = cells) != null ||
! casBase ( b = base,
Double . doubleToRawLongBits
( Double . longBitsToDouble ( b) + x) ) ) {
boolean uncontended = true ;
if ( cs == null || ( m = cs. length - 1 ) < 0 ||
( c = cs[ getProbe ( ) & m] ) == null ||
! ( uncontended = c. cas ( v = c. value,
Double . doubleToRawLongBits
( Double . longBitsToDouble ( v) + x) ) ) )
doubleAccumulate ( x, null , uncontended) ;
}
}
其中的关键Double.doubleToRawLongBits(Double.longBitsToDouble(b) + x),在读出来的时候,它把 long 类型转换成 double 类型,然后进行累加,累加的结果再转换成 long 类型,通过CAS写回去,这是保证double的相加完整,因为浮点类型和整型类型的互相操作(如相加)需要进行转换,否则报错(编译期报错,自然会使得运行时(或者点击运行后)报错),而我们操作的是double类型,那么操作之前就要变成与我们的一致,这样就不会使得省略一些数据,虽然反过来的结果一样,但也只是防止而已
DoubleAccumulate也是Striped64的成员方法,和longAccumulate类似,也是多了long类型和double类型的 互相转换
DoubleAccumulator和DoubleAdder的关系,与LongAccumulator和LongAdder的关系类似,只是多了一个 二元操作符
Lock与Condition :
互斥锁:
锁的可重入性:
"可重入锁"是指当一个线程调用 object.lock()获取到锁,进入临界区后,再次调用object.lock(),仍然可以获取 到该锁,显然,通常的锁都要设计成可重入的,否则就会发⽣死锁
synchronized关键字,就是可重入锁,如下所示:
在一个synchronized方法method1()里面调用另外一个synchronized方法method2(),如果synchronized关键 字不可重入,那么在method2()处就会发⽣阻塞,这显然不可行
public void synchronized method1 ( ) {
method2 ( ) ;
}
public void synchronized method2 ( ) {
}
类继承层次 :
在正式介绍锁的实现原理之前,先看一下 Concurrent 包中的与互斥锁(ReentrantLock)相关类之间的继承 层次,如下图所示:
上面虚线的白色指向,代表是其子类,但是是实现接口的方式,而黑色的指向,代表是其内部类,单纯白色(实线,不是虚线)的指向是其子类,但是是继承类的方式
Lock是一个接口,其定义如下:
public interface Lock {
void lock ( ) ;
void lockInterruptibly ( ) throws InterruptedException ;
boolean tryLock ( ) ;
boolean tryLock ( long time, TimeUnit unit) throws InterruptedException ;
void unlock ( ) ;
Condition newCondition ( ) ;
}
常用的方法是lock()/unlock(),lock()不能被中断,对应的lockInterruptibly()可以被中断
ReentrantLock本身除了内部类外没有代码逻辑,基本实现都在其内部类Sync中:
public class ReentrantLock implements Lock , java. io. Serializable {
private final Sync sync;
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public void unlock ( ) {
sync. release ( 1 ) ;
}
}
锁的公平性vs⾮公平性:
Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和⾮公平锁,从下面的ReentrantLock构造方法可以看出,会传入一个布尔类型的变量fair指定锁是公平的还是⾮公平的,默认为⾮公平 的
public ReentrantLock ( ) {
sync = new NonfairSync ( ) ;
}
public ReentrantLock ( boolean fair) {
sync = fair ? new FairSync ( ) : new NonfairSync ( ) ;
}
注意:公平一般代码队列的特性,而不公平一般代表栈的特性,但是这些只是极少数的,一般来说公平代表对应关系,或者说有顺序的关系,而不公平代表没有绝对的顺序,或者开始有顺序,但是操作时没有顺序,通常lock和synchronized就是这样,而之前的SynchronousQueue(一个阻塞队列)的说明就是队列和栈的特性
什么叫公平锁和⾮公平锁呢?先举个现实⽣活中的例子,一个人去⽕⻋站售票窗口买票,发现现场有人排队, 于是他排在队伍末尾,遵循先到者优先服务的规则,这叫公平,如果他去了不排队,直接冲到窗口买票,这叫作不 公平,一般来说,操作了对应的公平的线程叫做公平锁,而操作不公平的线程叫做不公平锁,其中lock和synchronized默认是不公平的,lock可以设置公平,但是synchronized基本不能,为了让你理解他们两个的公平说明,这里给出例子:在比赛跑步是,总共有100米,现在我们将小明放在50米,小军放在0米,很明显,小明在前面,如果是公平的,那么我们在小明后面放一个墙壁(可以认为是其他的方法判断,由于CAS存在,就算多个线程进入,后续的判断中的CAS基本也只能一个成功,所以该方法也相当于是CAS操作了),随着小明移动,那么就算小军速度再快,由于墙壁的原因,那么他始终不能超过小明,这就是公平,如果是不公平的,那么没有墙壁,即对应的小军可以超过小明,即更快的选择冠军,很明显,如果是不公平的,那么一般可以更快的选择冠军,因为在公平的情况下,快的确有可能需要等待慢的,所以通常我们也说不公平效率比公平效率高,因为不公平冠军基本更加容易出现(之所以说基本,是因为如果第一名本来是最快的,那么是否公平基本都是一样的时间得到冠军,但是这个只有第一名最快,使得是否公平效率都一样,但是其他情况不公平效率都比公平效率高),所以一般来说lock和synchronized默认都是不公平的
对应到锁的例子,一个新的线程来了之后,看到有很多线程在排队,自己排到队伍末尾(有墙壁),这叫公平,线程来了 之后直接去抢锁(没有墙壁),这叫作不公平,默认设置的是⾮公平锁,其实是为了提高效率,减少线程切换
锁实现的基本原理:
Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类⾮常重要,该类的父类是AbstractOwnableSynchronizer,由于大多数操作都会使用AbstractQueuedSynchronizer来进行操作阻塞队列(只是大多数,有些不是,比如前面说明的ConcurrentLinkedQueue类),所以一般我们将阻塞的队列称为AQS,此处的锁具备synchronized功能(既然只是具备,自然我们需要使用CAS),即可以阻塞一个线程,为了实现一把具有阻塞或唤醒功能的锁,需要⼏个核 ⼼要素:
1:需要一个state变量,标记该锁的状态,state变量⾄少有两个值:0、1(是至少,后面会说明),对state变量的操作,使用CAS保 证线程安全
2:需要记录当前是哪个线程持有锁
3:需要底层⽀持对一个线程进行阻塞或唤醒操作
4:需要有一个队列维护所有阻塞的线程,这个队列也必须是线程安全的无锁队列,也需要使用CAS
其中后面会在原语说明中进行CAS和synchronized的核心说明
针对要素1和2,在上面两个类中有对应的体现:
public abstract class AbstractOwnableSynchronizer implements java. io. Serializable {
private transient Thread exclusiveOwnerThread;
}
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java. io. Serializable {
private volatile int state;
}
state取值不仅可以是0、1,还可以大于1(所以前面说明的是至少),就是为了⽀持锁的可重入性,例如,同样一个线程,调用5次lock,state会变成5,然后调用5次unlock,state减为0
当state=0时,没有线程持有锁,exclusiveOwnerThread=null
当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程
当state > 1时,说明该线程重入了该锁
对于要素3,Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark
public native void unpark ( Object thread) ;
public native void park ( boolean isAbsolute, long time) ;
有一个LockSupport的工具类,对这一对原语做了简单封装:
public class LockSupport {
private static final Unsafe U = Unsafe . getUnsafe ( ) ;
public static void park ( ) {
U . park ( false , 0L ) ;
}
public static void unpark ( Thread thread) {
if ( thread != null )
U . unpark ( thread) ;
}
}
我们可以发现,他是一个工具类,也就是说,我们也能够进行使用他 ,一般的,可以直接操作并且是原子的或者最终会操作原语的主要调用,我们称为原语操作类,即可以操作原语的类,一般synchronized代码块主要是靠monitorenter和monitorexit这两个原语来实现同步的,但是我们也知道synchronized的原语有中间操作,因为他操作JVM阻塞(前面有过说明),但是最终还是操作系统阻塞,而CAS直接操作系统阻塞,所以通常CAS效率比synchronized高,那么换言之lock效率比synchronized高,这是因为中间操作需要时间,所以synchronized比较慢点,其中这个中间操作也是原语的,就算不是原语,由于被原语包括,那么我们也称为原语操作,那么很明显,CAS也就是类似于原语的一种,因为底层也是使用原语来完成的(native),我们可以看一下上面的代码,可以发现,对应的park和unpark都是一个native方法,具体如何操作原语那就不是java该做的了,具体可以学习C/C++或者汇编(自己去学习吧)或者其他可以操作原语的语言(虽然基本没有)
在当前线程中调用park(),该线程就会被阻塞,在另外一个线程中,调用unpark(Thread thread),传入一 个被阻塞的线程,就可以唤醒阻塞在park()地方的线程
unpark(Thread thread),它实现了一个线程对另外一个线程的"精准唤醒",notify也只是唤醒某一个线程,但 无法指定具体唤醒哪个线程,而所谓的精准唤醒指的是:对应的解锁或者唤醒只是操作最近的一个锁或者指定一个解锁(一般说明的是指定)
虽然lock在抢占方面对应的是可能有顺序的,但是在唤醒方面,可以认为是在对应的线程对象中进行操作
其中对于可重入操作,可以认为内部加上一个阻塞或者说无限循环或者是某种标识,认为没有释放,而对应的释放,只是释放该一个无限循环或者只是减少某个值,当最后一个或者说释放(唤醒)的对应值为0了,那么才会进行真正的释放,一般我们只会考虑标识的操作(在后面的unlock的说明中会体现),其中在抢占锁后,会删除对应的线程对象或者释放锁后,对应的某个变量设置为null(在后面的unlock会说明),这个线程对象在后面会说明的
针对要素4,在AQS中利用双向链表和CAS实现了一个阻塞队列,如下所示:
public abstract class AbstractQueuedSynchronizer {
static final class Node {
volatile Thread thread;
volatile Node prev;
volatile Node next;
}
private transient volatile Node head;
private transient volatile Node tail;
}
阻塞队列是整个AQS核⼼中的核⼼,如下图所示,head指向双向链表头部,tail指向双向链表尾部,入队就是 把新的Node加到tail后面,然后对tail进行CAS操作,最终tail指向新加入的线程对象(这里就是前面说的线程对象),出队就是对head进行CAS操作,然后把head向后移一个位置
初始的时候,head=tail=NULL,然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node,之后,在后面加入被阻塞的线程对象,tail然后指向他,所以,当head=tail的时候,说明队列为空
公平与⾮公平的lock()实现差异 :
下面分析基于AQS,ReentrantLock在公平性和⾮公平性上的实现差异:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L ;
protected final boolean tryAcquire ( int acquires) {
return nonfairTryAcquire ( acquires) ;
}
}
@ReservedStackAccess
final boolean nonfairTryAcquire ( int acquires) {
final Thread current = Thread . currentThread ( ) ;
int c = getState ( ) ;
if ( c == 0 ) {
if ( compareAndSetState ( 0 , acquires) ) {
setExclusiveOwnerThread ( current) ;
return true ;
}
}
else if ( current == getExclusiveOwnerThread ( ) ) {
int nextc = c + acquires;
if ( nextc < 0 )
throw new Error ( "Maximum lock count exceeded" ) ;
setState ( nextc) ;
return true ;
}
return false ;
}
@ReservedStackAccess
protected final boolean tryAcquire ( int acquires) {
final Thread current = Thread . currentThread ( ) ;
int c = getState ( ) ;
if ( c == 0 ) {
if ( ! hasQueuedPredecessors ( ) &&
compareAndSetState ( 0 , acquires) ) {
setExclusiveOwnerThread ( current) ;
return true ;
}
}
else if ( current == getExclusiveOwnerThread ( ) ) {
int nextc = c + acquires;
if ( nextc < 0 )
throw new Error ( "Maximum lock count exceeded" ) ;
setState ( nextc) ;
return true ;
}
return false ;
}
}
阻塞队列与唤醒机制:
下面进入锁的最为关键的部分,即acquireQueued(…)方法内部一探究竟:
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public final void acquire ( int arg) {
if ( ! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ) , arg) )
selfInterrupt ( ) ;
}
先说addWaiter(…)方法,就是为当前线程⽣成一个Node,然后把Node放入双向链表的尾部,要注意的是,这 只是把Thread对象放入了一个队列中而已,线程本身并未阻塞
private Node addWaiter ( Node mode) {
Node node = new Node ( mode) ;
for ( ; ; ) {
Node oldTail = tail;
if ( oldTail != null ) {
node. setPrevRelaxed ( oldTail) ;
if ( compareAndSetTail ( oldTail, node) ) {
oldTail. next = node;
return node;
}
} else {
initializeSyncQueue ( ) ;
}
}
}
private final void initializeSyncQueue ( ) {
Node h;
if ( HEAD . compareAndSet ( this , null , ( h = new Node ( ) ) ) )
tail = h;
}
创建节点,尝试将节点追加到队列尾部,获取tail节点,将tail节点的next设置为当前节点
如果tail不存在或者说为null,就初始化队列,创建一个空线程对象
在addWaiter(…)方法把Thread对象加入阻塞队列,而之后的阻塞工作就要靠acquireQueued(…)方法完成,线程一旦进 入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除⾮有其他线程 释放了锁,并且该线程拿到了锁,才会从accquireQueued(…)返回
进入acquireQueued(…),该线程被阻塞,在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一 刻,此时会删除队列的第一个元素(head指针前移1个节点),那么我们看一下acquireQueued方法
final boolean acquireQueued ( final Node node, int arg) {
boolean interrupted = false ;
try {
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head && tryAcquire ( arg) ) {
setHead ( node) ;
p. next = null ;
return interrupted;
}
if ( shouldParkAfterFailedAcquire ( p, node) )
interrupted |= parkAndCheckInterrupt ( ) ;
}
} catch ( Throwable t) {
cancelAcquire ( node) ;
if ( interrupted)
selfInterrupt ( ) ;
throw t;
}
}
⾸先,acquireQueued(…)方法有一个返回值,表示什么意思呢,虽然该方法不会中断响应,但它会记录被阻 塞期间有没有其他线程向它发送过中断信号,如果有,则该方法会返回true,否则,返回false(除非被其他人又操作回来了),当然他之所以是记录,是因为他并没有进行处理(即没有结束对应的方法操作,而是继续执行方法)
基于这个返回值,才有了下面的代码:
static void selfInterrupt ( ) {
Thread . currentThread ( ) . interrupt ( ) ;
}
当 acquireQueued(…)返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的 中断标志位设为true,之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进 行补偿,因为我还在阻塞当中(虽然对应的原语是处理中断的,但是可能还会阻塞,所以如果对方操作中断,并且又设置回去,那么实际上我操作了中断,但是没有响应,虽然你设置回去了,但是具体的变量我已经保存了,你设置回去没有用,我必须要操作中断,因为中断的操作是严谨的,并且你在中断时,再次的设置回去需要的时间比我直接得到返回需要的时间长,导致你只要操作了中断,我保存的就是中断的true信息,而不会让你多次的操作使得又回来变成false,即不会使得再次的设置回去,就算之后直接操作也是一样的),只要我获得了锁,那么会操作这个,使得对应锁内部操作进行中断,这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信 号
阻塞就发⽣在下面这个方法中:
private final boolean parkAndCheckInterrupt ( ) {
LockSupport . park ( this ) ;
return Thread . interrupted ( ) ;
}
线程调用 park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回
park()方法返回有两种情况:
1:其他线程调用了unpark(Thread t)
2:其他线程调用了t.interrupt()(t代表对应的线程,因为中断基本只能有对方自身调用),这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中 断,因为lock他并没有对应的报错信息,而你的报错只是解除阻塞而已,即内部操作了,没有给lock带来影响,所以说lock不响应中断(但是他只是没有直接响应而已,而是自己给内部代码进行响应,但是内部代表又不是lock,所以综上所述,lock的确没有中断响应操作),也正因为LockSupport.park()可能被中断唤醒,acquireQueued(…)方法才写了一个for死循环,唤醒之后,如 果发现自己排在队列头部,就去拿锁,如果拿不到锁,则再次自己阻塞自己,不断重复此过程,直到拿到锁,简单来说,就是自旋然后阻塞,这个自旋就是主要的操作
被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒,如果是情况1,会返回false,如果是情况2, 则返回true
public final void acquire ( int arg) {
if ( ! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ) , arg) )
selfInterrupt ( ) ;
}
unlock()实现分析:
说完了lock,下面分析unlock的实现,unlock不区分公平还是⾮公平(没有对应的tryAcquire操作,即没有具体操作是否公平的操作)
public void unlock ( ) {
sync. release ( 1 ) ;
}
public final boolean release ( int arg) {
if ( tryRelease ( arg) ) {
Node h = head;
if ( h != null && h. waitStatus != 0 )
unparkSuccessor ( h) ;
return true ;
}
return false ;
}
上面代码中,当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁
对于tryRelease方法:
@ReservedStackAccess
protected final boolean tryRelease ( int releases) {
int c = getState ( ) - releases;
if ( Thread . currentThread ( ) != getExclusiveOwnerThread ( ) )
throw new IllegalMonitorStateException ( ) ;
boolean free = false ;
if ( c == 0 ) {
free = true ;
setExclusiveOwnerThread ( null ) ;
}
setState ( c) ;
return free;
}
⾸先计算当前线程释放锁后的state值,如果当前线程不是排他线程,则抛异常,因为只有获取锁的线程才可以进行释放锁的操作,此时设置state,没有使用CAS,因为是单线程操作
再看unparkSuccessor方法:
private void unparkSuccessor ( Node node) {
int ws = node. waitStatus;
if ( ws < 0 )
node. compareAndSetWaitStatus ( ws, 0 ) ;
Node s = node. next;
if ( s == null || s. waitStatus > 0 ) {
s = null ;
for ( Node p = tail; p != node && p != null ; p = p. prev)
if ( p. waitStatus <= 0 )
s = p;
}
if ( s != null )
LockSupport . unpark ( s. thread) ;
}
release()里面做了两件事:tryRelease(…)方法释放锁,unparkSuccessor(…)方法唤醒队列中的后继者(头节点),因为第一个就是,所以综上所述,实际上公平和不公平只是决定了阻塞队列的顺序,但是具体的顺序还是固定的,所以前面也认为公平问题是对抢占问题的描述,而不是对获取锁的描述,所以当出现了lock或者synchronized他们是有顺序的说明,那么这是对阻塞队列顺序的说明,因为经历了多个线程的不规则进入,导致的顺序,所以如果出现说他们的抢占是随机的,那么是产生阻塞队列之前的抢占,如果是说有顺序的,那么说明的是抢占只后的阻塞队列,即这样的说明
lockInterruptibly()实现分析:
上面的 lock 不能被中断,这里的 lockInterruptibly可以被中断:
public void lockInterruptibly ( ) throws InterruptedException {
sync. acquireInterruptibly ( 1 ) ;
}
public final void acquireInterruptibly ( int arg)
throws InterruptedException {
if ( Thread . interrupted ( ) )
throw new InterruptedException ( ) ;
if ( ! tryAcquire ( arg) )
doAcquireInterruptibly ( arg) ;
}
这里的 acquireInterruptibly(…)也是 AQS 的模板方法,里面的 tryAcquire(…)也是分别被 FairSync和NonfairSync实现
主要看doAcquireInterruptibly(…)方法:
private void doAcquireInterruptibly ( int arg)
throws InterruptedException {
final Node node = addWaiter ( Node . EXCLUSIVE ) ;
try {
for ( ; ; ) {
final Node p = node. predecessor ( ) ;
if ( p == head && tryAcquire ( arg) ) {
setHead ( node) ;
p. next = null ;
return ;
}
if ( shouldParkAfterFailedAcquire ( p, node) &&
parkAndCheckInterrupt ( ) )
throw new InterruptedException ( ) ;
}
} catch ( Throwable t) {
cancelAcquire ( node) ;
throw t;
}
}
当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个方法返回(介绍),也就没有阻塞了
tryLock()实现分析 :
public boolean tryLock ( ) {
return sync. nonfairTryAcquire ( 1 ) ;
}
tryLock()实现基于调用⾮公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁,如果操作不 成功则直接返回false,也不阻塞,也没有中断的处理,也就只能抢一次,只能不会可以再操作了,即他并没有真正的操作是否公平,因为他是固定的
读写锁 :
和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间不互斥,读读不互斥,读写互斥,写写互斥,而不是与互斥锁一样,基本所有都互斥
类继承层次 :
ReadWriteLock是一个接口,内部由两个Lock接口组成
public interface ReadWriteLock {
Lock readLock ( ) ;
Lock writeLock ( ) ;
}
同理,黑色指向是其内部类,白色指向是其子类(是继承,因为不是虚线,若是虚线那么就是实现)
ReentrantReadWriteLock实现了该接口,使用方式如下(之前操作过ReentrantLock的大致使用方式,所以就没有给出具体的ReentrantLock的使用方式了,但这个ReentrantReadWriteLock没有使用过,所以这里就给出ReentrantReadWriteLock的使用方式):
ReadWriteLock readWriteLock = new ReentrantReadWriteLock ( ) ;
Lock readLock = readWriteLock. readLock ( ) ;
readLock. lock ( ) ;
readLock. unlock ( ) ;
Lock writeLock = readWriteLock. writeLock ( ) ;
writeLock. lock ( ) ;
writeLock. unlock ( ) ;
也就是说,当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调lock/unlock
读写锁实现的基本原理 :
从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已,什么叫两个视图呢? 可以理解为是一把锁,但是却将线程分成两类:读线程和写线程,读线程和写线程之间互斥(一把锁,要么写线程获取,要 么读线程获取),读线程之间不互斥,写线程之间互斥
从下面的构造方法也可以看出,readerLock和writerLock实际共用同一个sync对象,sync对象同互斥锁一样, 分为⾮公平和公平两种策略,并继承自AQS
public ReentrantReadWriteLock ( ) {
this ( false ) ;
}
public ReentrantReadWriteLock ( boolean fair) {
sync = fair ? new FairSync ( ) : new NonfairSync ( ) ;
readerLock = new ReadLock ( this ) ;
writerLock = new WriteLock ( this ) ;
}
同互斥锁一样,读写锁也是用state变量来表示锁状态的,只是state变量在这里的含义和互斥锁完全不同,在 内部类Sync中,对state变量进行了重新定义,如下所示:
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16 ;
static final int SHARED_UNIT = ( 1 << SHARED_SHIFT ) ;
static final int MAX_COUNT = ( 1 << SHARED_SHIFT ) - 1 ;
static final int EXCLUSIVE_MASK = ( 1 << SHARED_SHIFT ) - 1 ;
static int sharedCount ( int c) { return c >>> SHARED_SHIFT ; }
static int exclusiveCount ( int c) { return c & EXCLUSIVE_MASK ; }
}
也就是把 state 变量(int类型,有32位)拆成两半(注意并不是真正的两半,只是重新定义而已,或者说操作我定义的变量,而不是只操作state变量或者不操作state变量,当然由于是定义,可能并没有使用他(一般来说都会使用),即可能还是操作state,因为定义了并非一定使用),低16位,用来记录写锁,但同一时间既然只能有一个线程写,为什么还需要16位呢?这是因为一个写线程可能多次重入,例如,低16位的值等于5,表示一个写线程重入了5次
高16位,用来"读"锁,例如,高16位的值等于5,既可以表示5个读线程都拿到了该锁,也可以表示一个读线程 重入了5次
为什么要把一个int类型变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态呢?
这是因为无法用一次CAS 同时操作两个int变量(一般只能操作一个),所以用了一个int型的高16位和低16位分别表示读锁和写锁的 状态
当state=0时,说明既没有线程持有读锁,也没有线程持有写锁,当state != 0时,要么有线程持有读锁,要么 有线程持有写锁,两者不能同时成立,因为读和写互斥,这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁
AQS面的两对模板方法 :
下面介绍在ReentrantReadWriteLock的两个内部类ReadLock和WriteLock中,是如何使用state变量的
public static class ReadLock implements Lock , java. io. Serializable {
public void lock ( ) {
sync. acquireShared ( 1 ) ;
}
public void unlock ( ) {
sync. releaseShared ( 1 ) ;
}
}
public static class WriteLock implements Lock , java. io. Serializable {
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public void unlock ( ) {
sync. release ( 1 ) ;
}
}
acquire/release、acquireShared/releaseShared 是AQS里面的两对模板方法,互斥锁和读写锁的写锁都是基 于acquire/release模板方法来实现的,读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现 的,这两对模板方法的代码如下:
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public final void acquire ( int arg) {
if ( ! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ) , arg) )
selfInterrupt ( ) ;
}
public void unlock ( ) {
sync. release ( 1 ) ;
}
public final boolean release ( int arg) {
if ( tryRelease ( arg) ) {
Node h = head;
if ( h != null && h. waitStatus != 0 )
unparkSuccessor ( h) ;
return true ;
}
return false ;
}
public void lock ( ) {
sync. acquireShared ( 1 ) ;
}
public final void acquireShared ( int arg) {
if ( tryAcquireShared ( arg) < 0 )
doAcquireShared ( arg) ;
}
public void unlock ( ) {
sync. releaseShared ( 1 ) ;
}
public final boolean releaseShared ( int arg) {
if ( tryReleaseShared ( arg) ) {
doReleaseShared ( ) ;
return true ;
}
return false ;
}
其中他们又可以操作是否公平,那么我们可以将读/写、公平/⾮公平进行排列组合,就有4种组合,如下图所示,上面的两个方法都是在Sync中实现的,Sync中的两个方法又是模板方法,在NonfairSync和FairSync中分别有实现,最终的对应关系如下:
1:读锁的公平实现:Sync.tryAccquireShared()+FairSync中的两个重写的子方法(只有其中一个主要操作,读的)
2:读锁的⾮公平实现:Sync.tryAccquireShared()+NonfairSync中的两个重写的子方法(只有其中一个主要操作,读的)
3:写锁的公平实现:Sync.tryAccquire()+FairSync中的两个重写的子方法(只有其中一个主要操作,写的)
4:写锁的⾮公平实现:Sync.tryAccquire()+NonfairSync中的两个重写的子方法(只有其中一个主要操作,写的)
白色的指向是其子类(继承关系)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = - 8159625535654395037L ;
final boolean writerShouldBlock ( ) {
return false ;
}
final boolean readerShouldBlock ( ) {
return apparentlyFirstQueuedIsExclusive ( ) ;
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = - 2274990926593161451L ;
final boolean writerShouldBlock ( ) {
return hasQueuedPredecessors ( ) ;
}
final boolean readerShouldBlock ( ) {
return hasQueuedPredecessors ( ) ;
}
}
对于公平,比较容易理解,不论是读锁,还是写锁,只要队列中有其他线程在排队(排队等读锁,或者排队等 写锁),就不能直接去抢锁,要排在队列尾部
对于⾮公平,读锁和写锁的实现策略略有差异
写线程能抢锁,前提是state=0,只有在没有其他线程持有读锁或写锁的情况下,它才有机会去抢锁,或者state != 0,但那个持有写锁的线程是它自己,再次重入,写线程是⾮公平的,那么即writerShouldBlock()方法一直返 回false
对于读线程,假设当前线程被读线程持有,然后其他读线程还⾮公平地一直去抢,可能导致写线程永远拿不到 锁,所以对于读线程的⾮公平,要做一些"约束",当发现队列的第1个元素是写线程的时候,读线程也要阻塞,不能 直接去抢,即偏向写线程(因为对写锁来说,写锁和读锁都不能抢,但是对于读锁来说,写锁不能抢,但是读锁可以,所以写锁的不能抢是多个的,即偏向写锁的互斥)
综上所述,在满足对于的阻塞队列的添加下,然后操作释放时,看看后面的锁是否互斥,若不互斥,那么他也能得到锁,这是读写锁的根本操作
WriteLock公平vs⾮公平实现
写锁是排他锁,实现策略类似于互斥锁:
public boolean tryLock ( ) {
return sync. tryWriteLock ( ) ;
}
@ReservedStackAccess
final boolean tryWriteLock ( ) {
Thread current = Thread . currentThread ( ) ;
int c = getState ( ) ;
if ( c != 0 ) {
int w = exclusiveCount ( c) ;
if ( w == 0 || current != getExclusiveOwnerThread ( ) )
return false ;
if ( w == MAX_COUNT )
throw new Error ( "Maximum lock count exceeded" ) ;
}
if ( ! compareAndSetState ( c, c + 1 ) )
return false ;
setExclusiveOwnerThread ( current) ;
return true ;
}
lock()方法:
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public final void acquire ( int arg) {
if ( ! tryAcquire ( arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ) , arg) )
selfInterrupt ( ) ;
}
tryLock方法不区分公平/⾮公平,因为他基本并没有操作是否公平的方法来完成是否公平的操作
unlock()实现分析:
public void unlock ( ) {
sync. release ( 1 ) ;
}
public final boolean release ( int arg) {
if ( tryRelease ( arg) ) {
Node h = head;
if ( h != null && h. waitStatus != 0 )
unparkSuccessor ( h) ;
return true ;
}
return false ;
}
ReadLock公平vs⾮公平实现
读锁是共享锁,其实现策略和排他锁有很大的差异
tryLock()实现分析:
public boolean tryLock ( ) {
return sync. tryReadLock ( ) ;
}
@ReservedStackAccess
final boolean tryReadLock ( ) {
Thread current = Thread . currentThread ( ) ;
for ( ; ; ) {
int c = getState ( ) ;
if ( exclusiveCount ( c) != 0 &&
getExclusiveOwnerThread ( ) != current)
return false ;
int r = sharedCount ( c) ;
if ( r == MAX_COUNT )
throw new Error ( "Maximum lock count exceeded" ) ;
if ( compareAndSetState ( c, c + SHARED_UNIT ) ) {
if ( r == 0 ) {
firstReader = current;
firstReaderHoldCount = 1 ;
} else if ( firstReader == current) {
firstReaderHoldCount++ ;
} else {
HoldCounter rh = cachedHoldCounter;
if ( rh == null ||
rh. tid != LockSupport . getThreadId ( current) )
cachedHoldCounter = rh = readHolds. get ( ) ;
else if ( rh. count == 0 )
readHolds. set ( rh) ;
rh. count++ ;
}
return true ;
}
}
}
public void lock ( ) {
sync. acquireShared ( 1 ) ;
}
public final void acquireShared ( int arg) {
if ( tryAcquireShared ( arg) < 0 )
doAcquireShared ( arg) ;
}
unlock()实现分析:
public void unlock ( ) {
sync. releaseShared ( 1 ) ;
}
public final boolean releaseShared ( int arg) {
if ( tryReleaseShared ( arg) ) {
doReleaseShared ( ) ;
return true ;
}
return false ;
}
tryReleaseShared()的实现:
@ReservedStackAccess
protected final boolean tryReleaseShared ( int unused) {
Thread current = Thread . currentThread ( ) ;
for ( ; ; ) {
int c = getState ( ) ;
int nextc = c - SHARED_UNIT ;
if ( compareAndSetState ( c, nextc) )
return nextc == 0 ;
}
}
因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个for循环+CAS操作不断重试(保证他们都是减的,而不会出现某些问题,因为有同步的问题,这样就是防止出现,虽然有间隙,但是不一定是一直不相同的,前面这个地方有具体说明"因为可能操作时间长"),这是tryReleaseShared和tryRelease的根本差异所在
实际上读写锁的存在,就是为了使得提高在读读的效率,当然,可能其他的情况效率会变低,因为通常有对应的判断的,所以通常来说读写锁是主要操作读读的(后面也会说明写的"饿死"),当然了互斥锁由于整体要好,可能在一定的规模下,我们还是会使用互斥锁,因为这个时候可能读写锁的读读甚至比互斥锁要差(因为可能太卡顿)
Condition :
Condition与Lock的关系:
Condition本身也是一个接口,其功能和wait/notify类似,如下所示:
public interface Condition {
void await ( ) throws InterruptedException ;
boolean await ( long time, TimeUnit unit) throws InterruptedException ;
long awaitNanos ( long nanosTimeout) throws InterruptedException ;
void awaitUninterruptibly ( ) ;
boolean awaitUntil ( Date deadline) throws InterruptedException ;
void signal ( ) ;
void signalAll ( ) ;
}
wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用,因此,在Lock的接口中,有 一个与Condition相关的接口:
public interface Lock {
void lock ( ) ;
void lockInterruptibly ( ) throws InterruptedException ;
Condition newCondition ( ) ;
boolean tryLock ( ) ;
boolean tryLock ( long time, TimeUnit unit) throws InterruptedException ;
void unlock ( ) ;
}
Condition的使用场景:
以ArrayBlockingQueue为例(前面说明过了),如下所示为一个用数组实现的阻塞队列,执行put(…)操作的时候,队列满了, ⽣产者线程被阻塞,执行take()操作的时候,队列为空,消费者线程被阻塞
public class ArrayBlockingQueue < E > extends AbstractQueue < E >
implements BlockingQueue < E > , java. io. Serializable {
final Object [ ] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue ( int capacity, boolean fair) {
if ( capacity <= 0 )
throw new IllegalArgumentException ( ) ;
this . items = new Object [ capacity] ;
lock = new ReentrantLock ( fair) ;
notEmpty = lock. newCondition ( ) ;
notFull = lock. newCondition ( ) ;
}
public void put ( E e) throws InterruptedException {
Objects . requireNonNull ( e) ;
final ReentrantLock lock = this . lock;
lock. lockInterruptibly ( ) ;
try {
while ( count == items. length)
notFull. await ( ) ;
enqueue ( e) ;
} finally {
lock. unlock ( ) ;
}
}
private void enqueue ( E e) {
final Object [ ] items = this . items;
items[ putIndex] = e;
if ( ++ putIndex == items. length) putIndex = 0 ;
count++ ;
notEmpty. signal ( ) ;
}
public E take ( ) throws InterruptedException {
final ReentrantLock lock = this . lock;
lock. lockInterruptibly ( ) ;
try {
while ( count == 0 )
notEmpty. await ( ) ;
return dequeue ( ) ;
} finally {
lock. unlock ( ) ;
}
}
private E dequeue ( ) {
final Object [ ] items = this . items;
@SuppressWarnings ( "unchecked" )
E e = ( E ) items[ takeIndex] ;
items[ takeIndex] = null ;
if ( ++ takeIndex == items. length) takeIndex = 0 ;
count-- ;
if ( itrs != null )
itrs. elementDequeued ( ) ;
notFull. signal ( ) ;
return e;
}
}
具体前面也说明过了,所以这里可以直接大致的过一遍
Condition实现原理:
实际上Condition的使用很方便,避免了wait/notify的⽣产者通知⽣产者、消费者通知消费者的问题,具 体实现如下:
由于Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分,⾸先查看互斥锁和读写锁中Condition的构造方法:
public class ReentrantLock implements Lock , java. io. Serializable {
public Condition newCondition ( ) {
return sync. newCondition ( ) ;
}
}
public class ReentrantReadWriteLock
implements ReadWriteLock , java. io. Serializable {
private final ReentrantReadWriteLock. ReadLock readerLock;
private final ReentrantReadWriteLock. WriteLock writerLock;
public static class ReadLock implements Lock , java. io. Serializable {
public Condition newCondition ( ) {
throw new UnsupportedOperationException ( ) ;
}
}
public static class WriteLock implements Lock , java. io. Serializable {
public Condition newCondition ( ) {
return sync. newCondition ( ) ;
}
}
}
⾸先,读写锁中的 ReadLock 是不⽀持 Condition 的,读写锁的写锁或者说互斥锁就⽀持Condition,虽然它们各 自调用的是自己的内部类Sync的,但是内部类Sync里面的操作是创建AQS对应的内部类对象的,因此,上面的代码sync.newCondition最终得到了AQS里面的内部类对象,即ConditionObject,他是实现了Condition的
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer
implements java. io. Serializable {
public class ConditionObject implements Condition , java. io. Serializable {
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition ( ) {
return new ConditionObject ( ) ;
}
}
每一个Condition对象上面,都可能阻塞了多个线程,因此,在ConditionObject内部也有一个双向链表组成的队 列,如下所示:
public class ConditionObject implements Condition , java. io. Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
下面来看一下在await()/notify()方法中,是如何使用这个队列的
await()实现分析:
public final void await ( ) throws InterruptedException {
if ( Thread . interrupted ( ) )
throw new InterruptedException ( ) ;
Node node = addConditionWaiter ( ) ;
int savedState = fullyRelease ( node) ;
int interruptMode = 0 ;
while ( ! isOnSyncQueue ( node) ) {
LockSupport . park ( this ) ;
if ( ( interruptMode = checkInterruptWhileWaiting ( node) ) != 0 )
break ;
}
if ( acquireQueued ( node, savedState) && interruptMode != THROW_IE )
interruptMode = REINTERRUPT ;
if ( node. nextWaiter != null )
unlinkCancelledWaiters ( ) ;
if ( interruptMode != 0 )
reportInterruptAfterWait ( interruptMode) ;
}
关于await,有⼏个关键点要说明:
1:线程调用 await()的时候,肯定已经先拿到了锁,因为要操作对应的对象,必然是在lock里面,所以是拿到锁的,所以,在 addConditionWaiter()内部,对这个双向链表 的操作不需要执行CAS操作,线程天⽣是安全的,代码如下:
private Node addConditionWaiter ( ) {
if ( ! isHeldExclusively ( ) )
throw new IllegalMonitorStateException ( ) ;
Node t = lastWaiter;
if ( t != null && t. waitStatus != Node . CONDITION ) {
unlinkCancelledWaiters ( ) ;
t = lastWaiter;
}
Node node = new Node ( Node . CONDITION ) ;
if ( t == null )
firstWaiter = node;
else
t. nextWaiter = node;
lastWaiter = node;
return node;
}
2:在线程执行阻塞操作之前,必须先释放锁,也就是fullyRelease(node),否则会发⽣死锁,这个他们直接的唤醒和释放的操作和wait/notify与synchronized的配合机制基本一样
3:线程从wait(阻塞,这里只是代表阻塞的意思,而不是对应的wait方法哦)中被唤醒后,必须用acquireQueued(node, savedState)方法重新拿锁
4:checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断 信号,当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号(因为对应的park是可以操作中断的),而这里的await()方法是可以响应中断的(操作后面的判断),所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会 直接退出while循环,await()方法也会返回
5:isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面,初始的时候,Node只在Condition的队列里,而不在AQS的队列里,但执行notity(唤醒,这里只是代表唤醒的意思,而不是notity方法哦)操作的时候,会放进AQS的同步队列,因为要去判断是否释放(当被释放后,他放入队列,那么就不会继续操作阻塞了,因为!的存在,即!true就是false,所以他可以认为是判断该线程是否被释放的主要操作,即如果该队列存在,说明已经释放了,当然一般操作这里存在时,他不只是知道释放,并且也会清除释放的队列,总不能一直存在吧,主要看他是否这样操作了,你可以看里面的源码即可),自然放入队列
从上面看,我们可以发现,他也的确与wait和notity基本一样(前面也提到过),只是他可以定向的唤醒(前面也说明了,我们看源码也知道,他是根据对应是否存在来决定的,所以也就操作了定向了)
当然了不能抢占已经放入队列的锁(无论是否有操作公平)
awaitUninterruptibly()实现分析 :
与await()不同,awaitUninterruptibly()不会响应中断,其方法的定义中不会有中断异常抛出,下面分析其实 现和await()的区别
public final void awaitUninterruptibly ( ) {
Node node = addConditionWaiter ( ) ;
int savedState = fullyRelease ( node) ;
boolean interrupted = false ;
while ( ! isOnSyncQueue ( node) ) {
LockSupport . park ( this ) ;
if ( Thread . interrupted ( ) )
interrupted = true ;
}
if ( acquireQueued ( node, savedState) || interrupted)
selfInterrupt ( ) ;
}
可以看出,整体代码和 await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环
signal()实现分析 :
public final void signal ( ) {
if ( ! isHeldExclusively ( ) )
throw new IllegalMonitorStateException ( ) ;
Node first = firstWaiter;
if ( first != null )
doSignal ( first) ;
}
private void doSignal ( Node first) {
do {
if ( ( firstWaiter = first. nextWaiter) == null )
lastWaiter = null ;
first. nextWaiter = null ;
} while ( ! transferForSignal ( first) &&
( first = firstWaiter) != null ) ;
}
final boolean transferForSignal ( Node node) {
if ( ! node. compareAndSetWaitStatus ( Node . CONDITION , 0 ) )
return false ;
Node p = enq ( node) ;
int ws = p. waitStatus;
if ( ws > 0 || ! p. compareAndSetWaitStatus ( ws, Node . SIGNAL ) )
LockSupport . unpark ( node. thread) ;
return true ;
}
与await()再次的继续操作一样,在调用 signal()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了,所以若要继续操作,自然要操作获得锁,从而在对应阻塞的地方继续执行
然后,从队列中取出firstWaiter,唤醒它,但在通过调用unpark唤醒它之前,先用enq(node)方法把这个Node放入AQS的锁对应的阻塞队列中,也正因为如此,才有了await()方法里面的判断条件:while( ! isOnSyncQueue(node))
这个判断条件满⾜,说明await线程不是被中断,而是被unpark唤醒的,唤醒所有的方法(signalAll)与他类似,主要的区别就是执行了多次的transferForSignal方法
最后注意:在定义成员变量时,无论是否静态(包括静态块,其中静态块里面创建的变量是静态块的,操作完毕自动删除),若有继续利用成员变量,需要按照先后顺序
还有对应的CAS我们好像只是认为他进行了操作,但是并没有给出具体案例,若要看看如何具体操作,你可以到这里查看https://jiuaidu.com/jianzhan/1006040/(虽然前面说可能需要自带的某些操作(可能是得到偏移量的操作,一般都会直接得到),但是他这也可以是算的(虽然并没有),因为他也是我们写的,那么对于对应说明的类来说,里面也有类似的这个操作(或者CAS也是这个流程,比如在前面也说明过偏移量,如AtomicInteger类中的这个private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, “value”);就得到了偏移量),所以我也说某些自带的操作,自然也使用了Unsafe,当然对应博客里面也操作了找具体偏移量,以及比较的操作,当然这也是使用对应类Unsafe的了,大多数的相关CAS操作是由native操作的,所以就不多说了,且他解决了Unsafe使用的情况,即解决了前面说明的Unsafe使用报错的情况)
StampedLock :
为什么引入StampedLock:
StampedLock是在JDK8中新增的,有了读写锁,为什么还要引入StampedLock呢?
可以看到,从ReentrantLock到StampedLock,并发度依次提高,因为互斥的影响降低了,自然可以更快的进行操作(并发进入操作)
另一方面,因为ReentrantReadWriteLock采用的是"悲观读"的策略,当第一个读线程拿到锁之后,第二个、 第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程"饿死",虽然通常会在其公平或⾮公平的实现 中,都尽量避免这种情形(好像并没有具体说明,了解即可),但还有可能发⽣
StampedLock引入了"乐观读"策略,读的时候不加读锁,读出来发现数据被修改了,再升级为"悲观读",相当 于降低了"读"的地位,把抢锁的天平往"写"的一方倾斜了一下,避免写线程被饿死,即偏向读锁的互斥
使用场景:
在剖析其原理之前,下面先以官方的一个例子来看一下StampedLock如何使用
package main5 ;
import java. util. concurrent. locks. StampedLock ;
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock ( ) ;
void move ( double deltaX, double deltaY) {
long stamp = sl. writeLock ( ) ;
try {
x += deltaX;
y += deltaY;
} finally {
sl. unlockWrite ( stamp) ;
}
}
double distenceFromOrigin ( ) {
long stamp = sl. tryOptimisticRead ( ) ;
double currentX = x, currentY = y;
if ( ! sl. validate ( stamp) ) {
stamp = sl. readLock ( ) ;
try {
currentX = x;
currentY = y;
} finally {
sl. unlockRead ( stamp) ;
}
}
return Math . sqrt ( currentX * currentX + currentY * currentY) ;
}
}
如上面代码所示,有一个Point类,多个线程调用move()方法,修改坐标,还有多个线程调用 distanceFromOrigin()方法,求距离
⾸先,执行move操作的时候,要加写锁,这个用法和ReadWriteLock的用法没有区别,写操作和写操作也是 互斥的
关键在于读的时候,用了一个"乐观读"sl.tryOptimisticRead(),相当于在读之前给数据的状态做了一个"快照"。然后,把数据拷⻉到内存里面,在用之前,再比对一次版本号,如果版本号变了,则说明在读的期间有其他 线程修改了数据,读出来的数据废弃,重新获取读锁,关键代码就是下面这三行:
long stamp = sl. tryOptimisticRead ( ) ;
double currentX = x, currentY = y;
if ( ! sl. validate ( stamp) ) {
}
要说明的是,这三行关键代码对顺序⾮常敏感,不能有重排序,因为 state 变量已经是volatile,所以可以禁 ⽌重排序,但stamp并不是volatile的,为此,在validate(stamp)方法里面插入内存屏障
public boolean validate ( long stamp) {
VarHandle . acquireFence ( ) ;
return ( stamp & SBITS ) == ( state & SBITS ) ;
}
"乐观读"的实现原理:
⾸先,StampedLock是一个读写锁,因此也会像读写锁那样,把一个state变量分成两半,分别表示读锁和写 锁的状态,同时,它还需要一个数据的version,但是,一次CAS没有办法操作两个变量,所以这个state变量本身 同时也表示了数据的version,下面先分析state变量
public class StampedLock implements java. io. Serializable {
private static final int LG_READERS = 7 ;
private static final long RUNIT = 1L ;
private static final long WBIT = 1L << LG_READERS ;
private static final long RBITS = WBIT - 1L ;
private static final long RFULL = RBITS - 1L ;
private static final long ABITS = RBITS | WBIT ;
private static final long SBITS = ~ RBITS ;
private static final long ORIGIN = WBIT << 1 ;
private transient volatile long state;
}
如下图:用最低的8位表示读和写的状态,其中第8位表示写锁的状态,最低的7位表示读锁的状态,因为写锁 只有一个bit位,所以写锁是不可重入的
初始值不为0,而是把WBIT 向左移动了一位,也就是上面的ORIGIN 常量,构造方法如下所示
public StampedLock ( ) {
state = ORIGIN ;
}
为什么state的初始值不设为0呢,看对应乐观锁的实现:
public long tryOptimisticRead ( ) {
long s;
return ( ( ( s = state) & WBIT ) == 0L ) ? ( s & SBITS ) : 0L ;
}
public boolean validate ( long stamp) {
VarHandle . acquireFence ( ) ;
return ( stamp & SBITS ) == ( state & SBITS ) ;
}
上面两个方法必须结合起来看:当state&WBIT != 0的时候,说明有线程持有写锁(比如state对应的出现多余的1,0001 1000 0000),那么上面的tryOptimisticRead会永远返回0,这样,再调用validate(stamp),也就是validate(0)也会永远返回false,这正是我们想要的逻辑,所以当有线程持有写锁的时候,validate永远返回false,无论中间写线程是否释放了写锁,因为无论是否释放了(state回到初始值)写锁,state值都不为0,所以这个时候validate(0)永远为false,所以写与读不互斥
为什么上面的validate(…)方法不直接比较stamp=state,而要比较stamp&SBITS=state&SBITS 呢:
因为读锁和读锁是不互斥的,那么可能中间会被修改,所以如果stamp=state那么就不是true,而是false,而stamp&SBITS=state&SBITS仍然是true,满足读锁和读锁不互斥
所以,即使在"乐观读"的时候,state 值被修改了,但如果它改的是对应第7位,validate(…)还是会返回true(因为不是持有写锁了,而是读锁,即state&WBIT = 0,即&的结果是0,那么前面不是返回0,而是返回对应的值,可以观察到,返回了true)
另外要说明的一点是,上面使用了内存屏障VarHandle.acquireFence();,是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile的,由此可以禁⽌其和前面的currentX=X,currentY=Y进行重排序,比如操作保证不会与写冲突,这样就不用操作写锁了(在一定的情况下,重排序是可以替换互斥锁的,因为他内部就是可以操作互斥的),通过上面的分析,可以发现state的设计⾮常巧妙,只通过一个变量,既实现了读锁、写锁的状态记录,还实现 了数据的版本号的记录
悲观读/写:"阻塞"与"自旋"策略实现差异:
同ReadWriteLock一样,StampedLock也要进行悲观的读锁和写锁操作,不过,它不是基于AQS实现的,而是 内部重新实现了一个阻塞队列,如下所示:
public class StampedLock implements java. io. Serializable {
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait;
volatile Thread thread;
volatile int status;
final int mode;
WNode ( int m, WNode p) {
mode = m;
prev = p;
}
}
private transient volatile WNode whead;
private transient volatile WNode wtail;
}
这个阻塞队列和 AQS 里面的很像
刚开始的时候,whead=wtail=NULL,然后初始化,建一个空节点,whead和wtail都指向这个空节点,之后往 里面加入一个个读线程或写线程节点
但基于这个阻塞队列实现的锁的调度策略和AQS很不一样,也就是"自旋"
一般来说在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列(只是针对AbstractQueuedSynchronizer的对应操作),并且进入阻塞状态,但在StampedLock中,CAS state失败之后,会不断自旋,自旋⾜够多的次数之后,如果还拿不到锁,才进入 阻塞状态,为此,一般会根据CPU的核数,定义了自旋次数的常量值,如果是单核的CPU,肯定不能自旋,在多核情况下,才采 用自旋策略
private static final int NCPU = Runtime . getRuntime ( ) . availableProcessors ( ) ;
private static final int SPINS = ( NCPU > 1 ) ? 1 << 6 : 0 ;
下面以写锁的加锁,也就是StampedLock的writeLock()方法为例,来看一下自旋的实现
public long writeLock ( ) {
long next;
return ( ( next = tryWriteLock ( ) ) != 0L ) ? next : acquireWrite ( false , 0L ) ;
}
public long tryWriteLock ( ) {
long s;
return ( ( ( s = state) & ABITS ) == 0L ) ? tryWriteLock ( s) : 0L ;
}
如上面代码所示,当state&ABITS==0的时候,说明既没有线程持有读锁,也没有线程持有写锁,此时当前线 程有资格通过CAS操作state,来决定谁进入,前面的"一般来说只要修改了,那么必然会改变int c的结果"那个地方就是一个案例(因为要进行抢占,互斥的也基本如此,而不互斥的基本不会这样操作,CAS一般也是可以操作互斥的哦,因为只能一人进入,所以他的修改也是可以操作互斥的),若操作不成功,则调用acquireWrite()方法进入阻塞队列,并进行自旋,这个方法 是整个加锁操作的核⼼,代码如下:
private long acquireWrite ( boolean interruptible, long deadline) {
WNode node = null , p;
for ( int spins = - 1 ; ; ) {
long m, s, ns;
if ( ( m = ( s = state) & ABITS ) == 0L ) {
if ( ( ns = tryWriteLock ( s) ) != 0L )
return ns;
}
else if ( spins < 0 )
spins = ( m == WBIT && wtail == whead) ? SPINS : 0 ;
else if ( spins > 0 ) {
-- spins;
Thread . onSpinWait ( ) ;
}
else if ( ( p = wtail) == null ) {
WNode hd = new WNode ( WMODE , null ) ;
if ( WHEAD . weakCompareAndSet ( this , null , hd) )
wtail = hd;
}
else if ( node == null )
node = new WNode ( WMODE , p) ;
else if ( node. prev != p)
node. prev = p;
else if ( WTAIL . weakCompareAndSet ( this , p, node) ) {
p. next = node;
break ;
}
}
boolean wasInterrupted = false ;
for ( int spins = - 1 ; ; ) {
WNode h, np, pp; int ps;
if ( ( h = whead) == p) {
if ( spins < 0 )
spins = HEAD_SPINS ;
else if ( spins < MAX_HEAD_SPINS )
spins <<= 1 ;
for ( int k = spins; k > 0 ; -- k) {
long s, ns;
if ( ( ( s = state) & ABITS ) == 0L ) {
if ( ( ns = tryWriteLock ( s) ) != 0L ) {
whead = node;
node. prev = null ;
if ( wasInterrupted)
Thread . currentThread ( ) . interrupt ( ) ;
return ns;
}
}
else
Thread . onSpinWait ( ) ;
}
}
else if ( h != null ) {
WNode c; Thread w;
while ( ( c = h. cowait) != null ) {
if ( WCOWAIT . weakCompareAndSet ( h, c, c. cowait) &&
( w = c. thread) != null )
LockSupport . unpark ( w) ;
}
}
if ( whead == h) {
if ( ( np = node. prev) != p) {
if ( np != null )
( p = np) . next = node;
}
else if ( ( ps = p. status) == 0 )
WSTATUS . compareAndSet ( p, 0 , WAITING ) ;
else if ( ps == CANCELLED ) {
if ( ( pp = p. prev) != null ) {
node. prev = pp;
pp. next = node;
}
}
else {
long time;
if ( deadline == 0L )
time = 0L ;
else if ( ( time = deadline - System . nanoTime ( ) ) <= 0L )
return cancelWaiter ( node, node, false ) ;
Thread wt = Thread . currentThread ( ) ;
node. thread = wt;
if ( p. status < 0 && ( p != h || ( state & ABITS ) != 0L ) &&
whead == h && node. prev == p) {
if ( time == 0L )
LockSupport . park ( this ) ;
else
LockSupport . parkNanos ( this , time) ;
}
node. thread = null ;
if ( Thread . interrupted ( ) ) {
if ( interruptible)
return cancelWaiter ( node, node, true ) ;
wasInterrupted = true ;
}
}
}
}
}
整个acquireWrite(…)方法是两个大的for循环,内部实现了⾮常复杂的自旋策略,在第一个大的for循环里面, ⽬的就是把该Node加入队列的尾部,一边加入,一边通过CAS操作尝试获得锁,如果获得了,整个方法就会返 回,如果不能获得锁,会一直自旋,直到加入队列尾部
在第二个大的for循环里,也就是该Node已经在队列尾部了,这个时候,如果发现自己刚好也在队列头部,说 明队列中除了空的Head节点,就是当前线程了,当然此时,也会再进行新一轮的自旋,直到达到MAX_HEAD_SPINS次数(即也会继续尝试获得锁,这就是自旋的次数,总不能一直自旋吧), 然后进入阻塞,这里有一个关键点要说明:当release(…)方法(释放)被调用之后,会唤醒队列头部的第1个元素,此时会 执行第二个大的for循环里面的逻辑,也就是接着for循环里面park()方法后面的代码往下执行
另外一个不同于AQS的阻塞队列的地方是,在每个WNode里面有一个cowait指针,用于串联起所有的读线 程,例如,队列尾部阻塞的是一个读线程 1,现在又来了读线程 2、3,那么会通过cowait指针,把1、2、3串联起 来,1被唤醒之后,2、3也随之一起被唤醒,因为读和读之间不互斥
明⽩加锁的自旋策略后,下面来看锁的释放操作,和读写锁的实现类似,也是做了两件事情:一是把state变量 置回原位,二是唤醒阻塞队列中的第一个节点
@ReservedStackAccess
public void unlockWrite ( long stamp) {
if ( state != stamp || ( stamp & WBIT ) == 0L )
throw new IllegalMonitorStateException ( ) ;
unlockWriteInternal ( stamp) ;
}
private long unlockWriteInternal ( long s) {
long next; WNode h;
STATE . setVolatile ( this , next = unlockWriteState ( s) ) ;
if ( ( h = whead) != null && h. status != 0 )
release ( h) ;
return next;
}
private void release ( WNode h) {
if ( h != null ) {
WNode q; Thread w;
WSTATUS . compareAndSet ( h, WAITING , 0 ) ;
if ( ( q = h. next) == null || q. status == CANCELLED ) {
for ( WNode t = wtail; t != null && t != h; t = t. prev)
if ( t. status <= 0 )
q = t;
}
if ( q != null && ( w = q. thread) != null )
LockSupport . unpark ( w) ;
}
}
由于博文字数限制的原因,请到下一篇博文学习