阅读本文之前可以看一看 Java 多线程基础:
Java:多线程(进程线程,线程状态,创建线程,线程操作)
Java:多线程(同步死锁,锁&原子变量,线程通信,线程池)
1,内存屏障
1.1,优雅关闭
1.1.1,守护线程
在Java中,线程是运行中的代码段或函数,不能强制杀死。虽然有stop()和destroy()等函数,但官方不建议使用,因为强制终止会导致线程资源(如文件描述符、网络连接等)无法正常关闭。因此,应避免强行中断线程。合理的关闭方法是让线程自然运行完毕,释放所有资源后退出。对于不断循环运行的线程,应使用线程间通信机制,由主线程通知其退出。就需要用到线程间的通信机制,让主线程通知其退出。
【守护线程】当在一个JVM进程里面开多个线程时,这些线程被分成两类:**守护线程和非守护线程。**默认开的都是非守护线程。在Java中有一个规定:当所有的非守护线程退出后,整个JVM进程就会退出。例如,垃圾回收线程就是守护线程,它们在后台默默工作,当开发者的所有前台线程(非守护线程)都退出之后,整个JVM进程就退出了。
public class Main {
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
public void run() {
while (true) {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (Exception e) {
}
}
}
});
//设置守护线程
t1.setDaemon(true);
t1.start();
System.out.println("Main Exit");
}
}
==============================================
正常执行: 守护线程:
Main Exit Main Exit
Thread-0 Thread-0
Thread-0
Thread-0
Thread-0
设置关闭的标志位: 但还有一个问题:如果MyThread t在while循环中阻塞在某个地方,例如里面调用了 object.wait()函数,那它可能永远没有机会再执行 while(!stopped)代码,也就一直无法退出循环。
class MyThread extends Thread {
private boolean flag = false;
public void run() {
while (!flag) {
//...
}
}
public void setFlag() {
this.flag = true;
}
}
1.1.2,interrupt()
interrupt()方法:用于中断线程的执行(唤醒轻量级阻塞) 。它不是直接终止线程,而是设置线程的中断状态(每个线程都有一个中断状态,表示线程是否被中断,interrupt方法会设置线程的中断状态为true),并让线程自行决定如何响应中断。
- 线程需要自行检查和处理中断状态,通常通过在循环中定期调用Thread.interrupted()或isInterrupted()方法。
- 对于轻量级阻塞方法(如Thread.sleep()、Object.wait()和Thread.join()),如果线程被中断,这些方法会抛出InterruptedException。
Thread.interrupted()和isInterrupted()的区别:这两个函数都是线程用来判断自己是否收到过中断信号的,前者是非静态函数,后者是静态函数。前者只是读取中断状态,不修改状态;后者不仅读取中断状态,还会重置中断标志位。
轻量级阻塞:能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING;而像 synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是 BLOCKED。
1.2,内存可见性
1.2.1,CPU构造
因为存在CPU缓存一致性协议,例如MESI,多个CPU之间的缓存不会出现不同步的问题,不会有“内存可见性”问题。但是,缓存一致性协议对性能有很大损耗,为了解决这个问题,CPU 的设计者们在这个基础上又进行了各种优化。例如,在计算单元和L1之间加了Store Buffer、Load Buffer(还有其他各种Buffer)
但站在操作系统内核的角度,可以统一看待这件事情。
多CPU,每个CPU多核,每个核上面可能还有多个硬件线程,对于操作系统来讲,就相当于一个个的逻辑CPU。每个逻辑CPU都有自己的缓存,这些缓存和主内存之间不是完全同步的。对应到Java里,就是JVM抽象内存模型。
1.2.2,重排序
Store Buffer的延迟写入是重排序的一种,称为内存重排序(Memory Ordering)。除此之外,还有编译器和CPU的指令重排序。
- 编译器重排序:对于没有先后依赖关系的语句,编译器可以重新调整语句的执行顺序。
- CPU指令重排序:在指令级别,让没有依赖关系的多条指令并行。
- CPU内存重排序:CPU有自己的缓存,指令的执行顺序和写入主内存的顺序不完全一致(内存可见性)。
//线程1
X=1;
a=Y;
//线程2
Y=1;
b=X;
//执行结果
a=0, b=0
a=0, b=1
a=1, b=0
a=1, b=1
虽然线程1觉得自己是按代码顺序正常执行的,但在线程2看来,a=Y和X=1顺序却是颠倒的。指令没有重排序,是写入内存的操作被延迟了,也就是内存被重排序了,这就造成内存可见性问题。
对开发者而言,当然希望不要有任何的重排序,这样理解起来最简单,指令执行顺序和代码顺序严格一致,写内存的顺序也严格地和代码顺序一致。但是,从编译器和CPU的角度来看,希望尽最大可能进行重排序,提升运行效率。
- 单线程程序的重排序规则:单线程程序的执行结果在任何重排序下都不会改变,因为编译器和CPU会确保最终结果与按顺序执行相同。这就是单线程程序的重排序规则,即as-if-serial语义。只要操作之间没有数据依赖性,编译器和CPU可以任意重排序,开发者不会察觉,也不会有内存可见性问题。
- 多线程程序的重排序规则:对于多线程程序,编译器和CPU无法完全理解线程间的复杂数据依赖性,因此只能保证每个线程的as-if-serial语义。线程间的数据依赖和相互影响需由上层确定,并告知编译器和CPU何时可以重排序,何时不能重排序。
为了在多线程场景下明确何时可以重排序,Java 引入了 Java 内存模型(JMM)。JMM 是 JVM 与开发者及编译器、CPU 之间的协定,旨在平衡开发便利性与系统运行效率。JMM 允许编译器和 CPU 灵活重排序,同时明确告知开发者哪些重排序不需要感知,哪些需要感知。如果重排序会影响程序,开发者需使用 volatile、synchronized 等同步机制来禁止重排序。
1.1.3,happen-before
happen-before:如果A happen-before B,意味着A的执行结果必须对B可见,也就是保证跨线程的内存可见性。A happen before B不代表A一定在B之前执行。因为,对于多线程程序而言,两个操作的执行顺序是不确定的。happen-before只确保如果A在B之前执行,则A的执行结果必须对B可见。定义了内存可见性的约束,也就定义了一系列重排序的约束。
基于happen-before(具有传递性)的这种描述方法,JMM对开发者做出了一系列承诺:
- 单线程中的每个操作,happen-before 对应该线程中任意后续操作(也就是 as-if-serial语义保证)。
- 对volatile变量的写入,happen-before对应后续对这个变量的读取。JMM对编译器和CPU 来说,volatile 变量不能重排序;非 volatile 变量可以任意重排序。
- 对synchronized的解锁,happen-before对应后续对这个锁的加锁。
- 对final变量的写,happen-before于final域对象的读,happen-before于后续对final变量的读。
//happen-before 的传递性
public class Test {
private int a = 0;
private volatile int c = 0;
public void set() {
a = 5; //①
c = 1; //②
}
public int get() {
int d = c; //③
return a; //④
}
public static void main(String[] args) throws InterruptedException {
Test main = new Test();
// 创建线程1来调用 set() 方法
Thread thread1 = new Thread(() -> {
main.set();
System.out.println("Thread1 finished set()");
});
// 创建线程2来调用 get() 方法
Thread thread2 = new Thread(() -> {
int result = main.get();
System.out.println("Thread2 finished get(), a = " + result);
});
// 启动线程,根据代码顺序,输出结果
thread1.start();
thread2.start();
}
}
//执行结果①:
Thread2 finished get(), a = 5
Thread1 finished set()
//执行结果②:
Thread1 finished set()
Thread2 finished get(), a = 5
假设线程A先调用了set,设置了a=5;之后线程B调用了get,返回值一定是a=5。操作1和操作2是在同一个线程内存中执行的,操作1 happen-before 操作2,同理,操作3 happen-before操作4。又因为c是volatile变量,对c的写入happen-before对c的读取,所以操作2 happen-before操作3。
同理,synchronized同样具有happen-before语义。
public class Test {
private int a = 0;
private int c = 0;
public synchronized void set() {
a = 5;
c = 1;
}
public synchronized int get() {
return a;
}
}
1.3,内存屏障
为了禁止编译器重排序和 CPU 重排序,在编译器和 CPU 层面都有对应的指令,也就是内存屏障(Memory Barrier)。这也正是JMM和happen-before规则的底层实现原理。编译器的内存屏障,只是为了告诉编译器不要对指令进行重排序。当编译完成之后,这种内存屏障就消失了,CPU并不会感知到编译器中内存屏障的存在。JDK中的内存屏障函数:
- LoadLoad:禁止读和读的重排序。
- StoreStore:禁止写和写的重排序。
- LoadStore:禁止读和写的重排序。
- StoreLoad:禁止写和读的重排序。
- loadFence=LoadLoad+LoadStore
- storeFence=StoreStore+LoadStore
- fullFence=loadFence+storeFence+StoreLoad
实现volatile关键字的语义的一种做法:
- 在volatile写操作的前面插入一个StoreStore屏障。保证volatile写操作不会和之前的写操作重排序。
- 在volatile写操作的后面插入一个StoreLoad屏障。保证volatile写操作不会和之后的读操作重排序。
- 在volatile读操作的后面插入一个LoadLoad屏障+LoadStore屏障。保证volatile读操作不会和之后的读操作、写操作重排序。
具体到x86平台上,其实不会有LoadLoad、LoadStore和StoreStore重排序,只有StoreLoad一种重排序(内存屏障),也就是只需要在volatile写操作后面加上StoreLoad屏障。
1.4,final关键字
public class Test {
private int i;
private int j;
private static Test test;
public Test() {
i = 1;
j = 2;
}
public static void write() {
test = new Test();
}
public static void read() {
if (test != null) {
int a = test.i;
int b = test.j;
System.out.println("a=" + a + ",b=" + b);
}
}
public static void main(String[] args) throws InterruptedException {
Test main = new Test();
Thread thread1 = new Thread(() -> {
main.write();
});
Thread thread2 = new Thread(() -> {
main.read();
});
thread1.start();
thread2.start();
}
}
答案:a,b未必一定等于1,2(输出可能为空)。构造函数溢出问题。obj=new Example()这行代码,分解成三个操作:
- ① 分配一块内存;
- ② 在内存上初始化i=1,j=2;
- ③ 把obj指向这块内存。
操作②和操作③可能重排序,因此线程B可能看到未正确初始化的值。对于构造函数溢出,通俗来讲,就是一个对象的构造并不是“原子的”,当一个线程正在构造对象时,另外一个线程却可以读到未构造好的“一半对象”。
解决办法:
- 给i,j都加上volatile关键字。
- 为read/write函数都加上synchronized关键字。
- 如果i,j只需要初始化一次,则后续值就不会再变了,为其加上final关键字。
之所以能解决问题,是因为同volatile一样,final关键字也有相应的happen-before语义:
- 对final域的写(构造函数内部),happen-before于后续对 final域所在对象的读。
- 对final域所在对象的读,happen-before于后续对final域 的读。
2,Atomic类
2.1,AtomicInteger和CAS函数
2.1.1,乐观锁和悲观锁
对于一个整数的加减操作,要保证线程安全,需要加锁,也就是加synchronized关键字。(代码层面同:synchronized具有happen-before语义示例代码)但有了Concurrent包的Atomic相关的类之后,synchronized关键字可以用AtomicInteger代替,其代码更简洁,性能更好。
public class Test {
private AtomicInteger count = new AtomicInteger(0);
public void add() {
//自增
count.getAndIncrement();
}
public void decr() {
//自减
count.getAndDecrement();
}
}
数据库中的锁:悲观锁和乐观锁
【悲观锁】数据发生并发冲突的概率很大,所以读操作之前就上锁。如synchronized,ReentrantLock。
【乐观锁】数据发生并发冲突的概率比较小,所以读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是CAS(Compare And Set)。如AtomicInteger。
2.1.2,CAS
CAS函数:封装的Unsafe类中的一个native函数。AtomicInteger封装过的compareAndSet有两个参数。第一个参数expect是指变量的旧值(是读出来的值,写回去的时候,希望没有被其他线程修改,所以称为expect);第二个参数update是指变量的新值(修改过的,希望写入的值)。当expect等于变量当前的值时,说明在修改的期间,没有其他线程对此变量进行过修改,所以可以成功写入,变量被更新为update,返回true;否则返回false。
public class AtomicInteger extends Number implements java.io.Serializable {
...
//封装了一个int变量,对其进行CAS操作
private volatile int value;
...
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
}
2.1.3,Unsafe
Unsafe类是整个Concurrent包的基础,里面所有函数都是native的。进一步拆解:
public final class Unsafe {
...
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
...
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
}
compareAndSwapInt(Object var1, long var2, int var4, int var5) 函数有4个参数。在前两个参数中,第一个是对象(也就是 AtomicInteger 对象),第二个是对象的成员变量(也就是AtomictInteger里面包的int变量value),后两个参数保持不变。要特别说明一下第二个参数,它是一个long型的整数,经常被称为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量本身。在Unsafe中专门有一个函数,把成员变量转化成偏移量。
public native long objectFieldOffset(Field var1);
所有调用CAS的地方,都会先通过这个函数把成员变量转换成一个Offset。无论是Unsafe还是valueOffset,都是静态的,所有对象共用的。在转化的时候,先通过反射(getDeclaredField)获取value成员变量对应的Field对象,再通过objectFieldOffset函数转化成valueOffset。此处的valueOffset就代表了value变量本身,后面执行CAS操作的时候,不是直接操作value,而是操作valueOffset。
public class AtomicInteger extends Number implements java.io.Serializable {
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
}
2.2,其他常见函数
2.2.1,AtomicBoolean和AtomicReference
对于int或者long型变量,需要进行加减操作,所以要加锁;对于一个boolean类型来说,需要进行一个原子的比较并设置操作(compare-and-set)。
AtomicBoolean atomicFlag = new AtomicBoolean(false);
// Thread 1
atomicFlag.set(true);
// Thread 2
if (atomicFlag.compareAndSet(true, false)) {
...
}
同样地,AtomicReference也需要同样的功能:
import java.util.concurrent.atomic.AtomicReference;
class MyObject {
int value;
}
// 使用AtomicReference
AtomicReference<MyObject> atomicMyObject = new AtomicReference<>(new MyObject());
// Thread 1
atomicMyObject.set(new MyObject());
// Thread 2
MyObject expected = atomicMyObject.get();
MyObject newObject = new MyObject();
if (atomicMyObject.compareAndSet(expected, newObject)) {
newObject.value = 10;
}
在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
第一个参数是要修改的对象,第二个参数是对象的成员变量在内存中的位置(一个long型的整数),第三个参数是该变量的旧值,第四个参数是该变量的新值。
- 对于用int型来代替的,在入参的时候,将boolean类型转换成int类型;在返回值的时候,将int类型转换成boolean类型。
public class AtomicBoolean implements java.io.Serializable {
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
}
- 对于double类型,需要用到double类型提供的一对double类型和long类型互转的函数。
public final class Double extends Number implements Comparable<Double> {
public static native double longBitsToDouble(long bits);
public static native long doubleToRawLongBits(double value);
}
2.2.2,AtomicStampedReference&AtomicMarkableReference
ABA 问题指的是一个位置的值被读取两次,中间被改动后又改回原值,使得第二次读取时无法感知到中间发生的变化。通过附加一个标记(版本号),AtomicStampedReference 能有效避免这种情况。
public class AtomicStampedReference<V> {
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
}
之前的 CAS只有两个参数,这里的 CAS有四个参数,后两个参数就是版本号的旧值和新值。当expectedReference != 对象当前的reference时,说明该数据肯定被其他线程修改过;当expectedReference == 对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。
【问题】为什么没有AtomicStampedInteger或AtomicStampedLong?因为这里要同时比较数据的“值”和“版本号”,而Integer型或者Long型的CAS没有办法同时比较两个变量,于是只能把值和版本号封装成一个对象,也就是 Pair 内部类,然后通过对象引用的CAS来实现。
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
}
当使用的时候,在构造函数里面传入值和版本号两个参数,应用程序对版本号进行累加操作,然后调用上面的CAS。
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是boolean类型,不是整型的累加变量。因为是boolean类型,只能有true、false 两个版本号,所以并不能完全避免ABA问题,只是降低了ABA发生的概率。
public class AtomicMarkableReference<V> {
private static class Pair<T> {
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}
private volatile Pair<V> pair;
}
2.2.3,AtomicIntegerFieldUpdater,AtomicLongFieldUpdate和AtomicReferenceFieldUpdater
如果一个类是自己编写的,则可以在编写的时候把变量定义为Atomic类型。但如果是一个已经有的类,在不能更改源代码的情况下,要想实现对其他成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。
AtomicIntegerFieldUpdater是一个抽象类。首先,其构造方法是protected,不能直接构造其对象,必须通过它提供的的一个静态方法来创建,如下:
public abstract class AtomicIntegerFieldUpdater<T> {
protected AtomicIntegerFieldUpdater() {
}
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
}
newUpdater(…)静态方法传入的是要修改的类(不是对象)和对象的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater。所以,这个对象表示的是类的某个成员,而不是对象的成员变量。
若要修改某个对象的成员变量的值,再传入相应的对象,如下:
public abstract class AtomicIntegerFieldUpdater<T> {
public int getAndIncrement(T obj) {
int prev, next;
do {
prev = get(obj);
next = prev + 1;
} while (!compareAndSet(obj, prev, next));
return prev;
}
public final boolean compareAndSet(T obj, int expect, int update) {
accessCheck(obj);
return U.compareAndSwapInt(obj, offset, expect, update);
}
}
accessCheck方法的作用是检查该obj是不是tclass类型,如果不是,则拒绝修改,抛出异常。从代码上看,其CAS原理和AtomicInteger是一样的,底层都调用了Unsafe的compareAndSetInt(…)方法。
要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装类),该限制从其构造方法中可以看到:
if (field.getType() != int.class) throw new IllegalArgumentException("Must be integer type");
if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException("Must be volatile type");
2.2.4,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray
Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
相比于AtomicInteger的getAndIncrement()方法,这里只是多了一个传入参数:数组的下标 i。
其底层的CAS函数用的还是compareAndSwapInt,但是把数组下标 i 转化成对应的内存偏移量。
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
把下标 i 转换成对应的内存地址,用到了 shift 和 base 两个变量。这两个变量都是AtomicIntegerArray的静态成员变量,用了Unsafe类的arrayBaseOffset和arrayIndexScale两个函数来获取。
private static final int base = unsafe.arrayBaseOffset(int[].class);
static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
其中,base表示数组的首地址的位置,scale表示一个数组元素的大小,i的偏移量则等于:i * scale+base。但为了优化性能,使用了位移操作,shift表示scale中1的位置(scale是2的整数次方)。所以,偏移量的计算变成上面代码中的:i<;<;shift+base,表达的意思就是:i*scale+base。
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
}
2.3,Striped64与LongAdder
从JDK 8开始,针对Long型的原子操作,Java又提供了LongAdder、LongAccumulator;针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。Striped64相关的类的继承层次如下:
2.3.1,LongAdder原理
AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发的场景下仍不够快。LongAdder 把一个变量拆分成多份,变为多个变量,有些类似于ConcurrentHashMap的分段锁的例子。把一个Long型拆成一个base变量外加多个Cell,每个Cell包装一个Long型变量。当多个线程并发累加的时候,如果并发度低,就直接加到base变量上;如果并发度高,冲突大,平摊到这些Cell上。在最后取值的时候,再把base和这些Cell求sum运算。
public class LongAdder extends Striped64 implements Serializable {
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
}
由于无论是long,还是double,都是64位的。但因为没有double型的CAS操作,所以是通过把double型转换为long型来实现的。所以,上面的base和cell[]变量,是位于基类Striped64当中的。英文Striped意为“条带”,也就是分片。
2.3.2,最终一致性
在sum求和方法中,并没有对cells[]数组加锁。也就是说,一边有线程对其执行求和操作,一边还有线程修改数组里的值,也就是最终一致性,而不是强一致性。这类似于ConcurrentHashMap中的clear()方法,一边执行清空操作,一边还有线程放入数据,clear()方法调用完毕后再读取,hashmap里面可能还有元素。因此,LongAdder适合高并发的统计场景,而不适合要对某个Long型变量进行严格同步的场景。
2.3.3,伪共享与缓存行填充
在Cell类中的定义中,用了一个独特的注解@sun.misc.Contended,这是JDK 8 之后才有的,背后涉及一个很重要的优化原理:伪共享与缓存行填充。每个CPU都有自己的缓存。缓存与主板进行数据交换的基本单位叫Cache Line(缓存行)。在64位x86架构中,缓存行是64字节,也就是8个Long型的大小。这也意味着当缓存失效,要刷新到主内存的时候,最少要刷新64字节。
【案例】假设主内存中有变量X、Y、Z(假设每个变量都是一个Long型),被CPU1和CPU2分别读入自己的缓存,放在了同一行Cache Line里面。当CPU1修改了X变量,它要失效整行Cache Line,也就是往总线上发消息,通知CPU2对应的Cache Line失效。由于Cache Line是数据交换的基本单位,无法只失效X,要失效就会失效整行的Cache Line,这会导致Y、Z变量的缓存也失效。
虽然只修改了X变量,本应该只失效X变量的缓存,但Y、Z变量也随之失效。Y、Z变量的数据没有修改,本应该很好地被CPU1和CPU2共享,却没做到,这就是所谓地“伪共享问题”。
问题的原因是:Y、Z和变量X处在同一行Cache Line里面。要解决这个问题,需要用到所谓的“缓存行填充”,分别再X、Y、Z后面加上7个无用的Long型,填充整个缓存行,让X、Y、Z处在三行不同的缓存行中,如下图所示:
声明一个@jdk.internal.vm.annotation.Contended/@sun.misc.Contended即可实现缓存行的填充。之所以这个地方要用缓存行填充,是为了不让Cell[]数组中相邻元素落到同一个缓存行里。
2.3.4,LongAdder核心
LongAdder最核心的累加方法add(long x),自增、自减操作都是通过调用该方法实现的:
当一个线程调用add(x)的时候,首先会尝试使用caseBase把x加到base变量上。如果不成功,则再用c.cas(…)方式尝试把x加到Cell数组的某个元素上。如果还不成功。最后在调用longAccumulate(…)方法。
注意:Cell[]数组的大小始终是2的整数次方,在运行中会不断扩容,每次扩容都是增长2倍。上面代码中的cs[getProbe() & m]其实就是对数组的大小取模。因为m=cs.length-1,getProbe()为该线程生成一个随机数,用该随机数对数组长度取模。因为数组长度是2的整数次方,所以可以用&操作来优化取模运算。
对于一个线程来说,它并不在意到底是把x累加到base上,还是累加到Cell[]数组上,只要累加成功就可以。因此,这里使用随机数来实现Cell的长度取模。如果两次尝试都不成功,则调用longAccumulate(…)方法,该方法在Striped64里面,LongAccumulator也会用到。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// true表示最后一个slot非空
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
// 如果cells不是null,且cells长度大于0
if ((cs = cells) != null && (n = cs.length) > 0) {
// cells最大下标对随机数取模,得到新下标。
if ((c = cs[(n - 1) & h]) == null) {
// 自旋锁标识,用于创建cells或扩容cells
if (cellsBusy == 0) { // 尝试添加新的Cell
Cell r = new Cell(x); // Optimistically create
// 如果cellsBusy为0,则CAS操作cellBusy为1,获取锁
if (cellsBusy == 0 && casCellsBusy()) {
try { // 获取锁之后,再次检查
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 赋值成功,返回
rs[j] = r;
break done;
}
} finally {
// 重置标志位,释放锁
cellsBusy = 0;
}
continue; // 如果Slot非空,则进入下一次循环
}
}
collide = false;
}
else if (!wasUncontended) // CAS操作失败
wasUncontended = true; // rehash之后续
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // 扩容,每次都是上次的两倍长度
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}
2.3.5,LongAccumulator
LongAccumulator的原理和LongAdder类似,只是功能强大,下面为两者构造方法的对比:
public LongAdder() {}
public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
LongAdder只能及逆行累加操作,并且初始值默认为0;LongAccumulator可以自己定义一个二元操作符,并且可以传入一个初始值。
@FunctionalInterface
public interface LongBinaryOperator {
/**
* Applies this operator to the given operands.
*
* @param left base变量或者Cell[]中元素的当前值
* @param right add()方法传入的参数x
* @return the operator result
*/
long applyAsLong(long left, long right);
}
LongAccumulator的accumulate(x)方法,与LongAdder(x)方法类似,最后都是调用Striped64的longAccumulate(…)方法。唯一的差别就是LongAdder的add(x)方法调用的是caseBase(b, b+x),这里调用的是caseBase(b, r),其中r=function.applyAsLong(b=base,x)。
DoubleAdder其实也是用long型实现的,因为没有double类型的CAS方法。DoubleAdder的add(x)方法和LongAdder的add(x)方法基本一样,只是多了long和double类型的相互转换:Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x) 。