第四章java并发包中原子操作类原理剖析
JUC包提供了一系列的原子类操作,这些类都是使用非阻塞算法CAS实现的,相比使用锁实现原子操作在性能上有很大提高
本章只讲解最简单的AtomicLong类的实现原理以及JDK8中新增的LongAdder和LongAccumulator类的原理
原子变量的操作类
AtomicLong,AtomicInteger,AtomicBoolean等原子类操作类,内部使用Unsafe来实现
public class AtomicLongTest extends Number implements Serializable {
private static final long serialVersionUID= 1927816293512124184L;
private static final Unsafe unsafe =Unsafe.getUnsafe();
private static final long valueOffset;
static final boolean VM_SUPPORTS_LONG_CAS= VMSupportsCS8();
private static native boolean VMSupportsCS8();
static {
try{
valueOffset = unsafe.objectFieldOffset(AtomicLongTest.class.getDeclaredField("value"));
}catch (Exception e){
throw new Error(e);
}
}
private volatile long value;
private AtomicLongTest(long initiaValue){
value=initiaValue;
}
@Override
public int intValue() {
return 0;
}
@Override
public long longValue() {
return 0;
}
@Override
public float floatValue() {
return 0;
}
@Override
public double doubleValue() {
return 0;
}
}
递增和递减代码操作
boolean compareAndSet(long expect,long update)
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
内部还是调用了 unsafe.compareAndSwapLong方法,如果原子变量中的value值等于expect,则使用update值更新该值并返回true,否则返回false
public class AtomicTest {
private static AtomicLong atomicLong=new AtomicLong();
private static Integer[] arrayOne=new Integer[]{0,1,2,3,4,5,6,7,56,0};
private static Integer[] arrayTwo=new Integer[]{10,1,2,3,4,5,6,0,56,0};
public static void main(String[] args) throws InterruptedException{
Thread threadOne=new Thread(new Runnable() {
@Override
public void run() {
int size = arrayOne.length;
for(int i=0;i<size;i++){
if(arrayOne[i].intValue()==0){
atomicLong.incrementAndGet();
}
}
}
});
Thread threadTwo=new Thread(new Runnable() {
@Override
public void run() {
int size = arrayTwo.length;
for(int i=0;i<size;i++){
if(arrayTwo[i].intValue()==0){
atomicLong.incrementAndGet();
}
}
}
});
threadOne.start();
threadTwo.start();
threadOne.join();
threadTwo.join();
System.out.println("count 0:"+atomicLong.get());
}
}
JDK8新增的原子操作类LongAdder()方法
简单介绍
前面提到过了AtomicLong是通过CAS提供的非阻塞的原子性操作,相比阻塞算法的同步器来说性能已经很好了,但是在高并发下大量线程同时去竞争同一个原子变量,由于同时只有一个线程凯跃操作成功,这样就造成了大量线程竞争失败后,会通过无限循环不断进行自选操作尝试CAS,白白浪费了CPU资源
使用LongAddr时候会在内部维护多个Cell变量,每个Cell里面有一个初始为零的long变量,在同等的并发量下,争夺单个变量更新操作的线程会减小,变相的减少了争夺共享资源的并发量
当多个线程在争夺同一个cell变量失败后,并不是在当前的cell变量上一直自旋CAS重试,而是在其他Cell变量上尝试进行CAS,这个改变增加了当前线程重试CAS的成功的可能性,最后在获取LongAdder的值的时候,是把所有的Cell变量的值累加再加上Base返回的
LongAdder 维护了一个延迟初始化的原子性更新数组 (默认情况下 Cell 数组是 nu和一个基值变量 base。由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它而是在需要时创建,也就是惰性加载。
当一开始判断 Cell 数组是 null 并且并发线程较少时,所有的累加操作都是对 base变量进行的。保持 Cell 数组的大小为2的N次方,在初始化时 Cel 数组中的 Cel元素个数为2,数组里面的变量实体是 Cell 类型。Cell 类型是 AtomicLong 的一个改进,用来减少缓存的争用,也就是解决伪共享问题。
对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用 @sun.misc.Contended 注解对 Cell类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。
代码分析
- (1)LongAdder 的结构是怎样的?
- (2)当前线程应该访问 Cell 数组里面的哪一个 Cell 元素?
- (3)如何初始化 Cell 数组?
- (4) Cell 数组如何扩容?
- (5) 线程访问分配的 Cel 元素有冲突后如何处理?
- (6)如何保证线程操作被分配的 Cell 元素的原子性?
cell的构造
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
可以看到,Cell 的构造很简单,其内部维护一个被声明为 volatile 的变量,这里声为volatile是因为线程操作 value 变量时没有使用锁,为了保证变量的内存可见性这里格声明为volatie的。另外cas 函数通过CAS 操作,保证了当前线程更新时被分配的Cell元素中value值的原子性。另外,Cell 类使用@sun.misc.Contended 修饰是为了避免伪共享
- ·long sum() 返回当前的值,内部操作是累加所有 Cell内部的 value 值后再累加 bas例如下面的代码,由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程可能有其他线程对Cell 中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用 sum 方法时的原子快照值
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
- void reset() 为重置操作,如下代码把 base 置为 0,如果 Cell 数组有元素,则元素值被重置为0。
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
- long sumThenReset0是 sum 的改造版本,如下代码在使用sum累加对应的Cell值后把当前 Cell 的值重置为 0,base 重置为0。这样,当多线程调用该方法时会有问题比如考虑第一个调用线程清空 Cell 的值,则后一个线程调用时累加的都是0值。
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
long longValue0等价于 sum0)。
- 下面主要看下 add 方法的实现,从这个方法里面就可以找到其他问题的答案。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final boolean casBase(long cmp, long val){
return UNSAFE.compareAndSwapLong(this,BASE,cmp, val);
}
代码 (1)首先看 cells 是否为null,如果为 null 则当前在基础变量 base 上进行累加,这时候就类似AtomicLong 的操作。
如果cells 不为 null 或者线程执行代码(1)的 CAS 操作失败了,则会去执行代码(2),代码(2)(3)决定当前线程应该访问 cells 数组里面的哪一个 Cell 元素,如果当前线程射的元素存在则执行代码 (4),使用CAS 操作去更新分配的 Cell 元素的 value 值,如当前线程映射的元素不存在或者存在但是 CAS 操作失败则执行代码(5)。其实将代码(2(3)(4)合起来看就是获取当前线程应该访问的 cells 数组的 Cell 元素,然后进行 CAS更新操作,只是在获取期间如果有些条件不满足则会跳转到代码(5) 执行。另外当前线程应该访问 cells数组的哪一个Cell元素是通过getProbe0)& m进行计算的,其中m是当cells 数组元素个数 -1,getProbe0 则用于获取当前线程中变量 threadLocalRandomProbe值,这个值一开始为0,在代码(5)里面会对其进行初始化。并且当前线程通过分配的Cell 元素的cas 函数来保证对 Cell 元素 value 值更新的原子性,到这里我们回答了问题2和问题6。
该代码为cells数组初始化和扩容的代码
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
在被初始化或者扩容,或者当前在创建新的 Cell 元素、通过 CAS 操作来进行0或 1状态的切换,这里使用 casCellsBusy 函数。假设当前线程通过 CAS 设置 ellsBusy 为1,则前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码 《14.1)初始化cells 数组元素个数为 2,然后使用 h&1 计算当前线程应该访 el 数组的哪个位置,也就是使用当前线程的 threadLocalRandomProbe 变量值& (cells 数组元素个数 -1),然后标示cels 数组已经被初始化,最后代码 (14.3) 重置了 ellsBusy 标记。显然这里没有使用CAS 操作,却是线程安全的,原因是 cellsBusy 是 volatile 类型的,这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改 cellsBusy 的值。在这里初始化的 cells 数组里面的两个元素的值目前还是 null。这里回答了问题 3,知道了 cells 数组如何被初始化。
cells数组的扩容是在代码(12)中进行的,对 cells 扩容是有条件的,也就是代码(10)(11)的条件都不满足的时候。具体就是当前 cells 的元素个数小于当前机器 CPU 个数并目当前多个线程访问了 cells 中同一个元素,从而导致冲突使其中一个线程 CAS 失败时才会进行扩容操作。这里为何要涉及 CPU 个数呢?其实在基础篇中已经讲过,只有当每个 CPU都运行一个线程时才会使多线程的效果最佳,也就是当 cells 数组元素个数与 CPU个数一致时,每个 Cell 都使用一个 CPU 进行处理,这时性能才是最佳的。代码(12)中的扩容操作也是先通过 CAS 设置 cellsBusy 为1,然后才能进行扩容。假设 CAS 成功则执行代码(12.1)将容量扩充为之前的 2 倍,并复制 Cell 元素到扩容后数组。另外,扩容后 cells 数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是 null。这里回答了问题4。
在代码(7)(8)中,当前线程调用add 方法并根据当前线程的随机数threadLocalRandomProbe 和 cells 元素个数计算要访问的 Cell元素下标,然后如果发现对应下标元素的值为 null,则新增一个 Cell 元素到 cells 数组,并且在将其添加到 cells 数组之前要竞争设置 cellsBusy 为 1。
代码(13)对CAS失败的线程重新计算当前线程的随机值 threadLocalRandomProbe,以减少下次访问 cells 元素时的冲突机会。这里回答了问题 5。
小结
介绍了新增的JDK8中新增的LongAdder原子操作类,该类通过内部cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对cells数组的元素进行操作,数组元素cell使用
@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也就避免了伪共享
LongAccumulator类原理探究
LongAdder 类是 LongAccumulator 的一个特例,LongAccumulator 比 LongAdder 的更强大。
其中accumulatorFunction 是一个双目运算器接口,其根输入的两个参数返回一个计算值,identity 则是 LongAccumulator 累加器的初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
调用 LongAdder 就相于使用下面的方式调用 LongAccumulator:
调用 LongAdder 就相于使用下面的方式调用 LongAccumulator:
public interface LongBinaryOperator{
long applyAsLong(long left,long right);
}
LongAdder adder new LongAdder();
LongAccumulator accumulator =new LongAccumulator(new LongBinaryOperator(){
@Override
public long applyhsLong(long left, long right) {
return left + right;
}
},0);
LongAccumulator 相比于 LongAdder,可以为累加器提供非 0的初始值,后者只能提供默认的0值。另外,前者还可以指定累加规则,比如不进行累加而进行相乘,只需要在构造LongAccumulator时传入自定义的双目运算器即可,后者则内置累加的规则。
从下面代码我们可以知道,LongAccumulator 相比于 LongAdder 的不同在于,在调用caseBase时,后者传递的是b+x,前者使用了r=function.ApplyAsLong(b=base,x)来计算
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
longAccumulate(x, function, uncontended);
}
}
前者调用的时候传递的是function,后者是null
当fn为null时候就使用v+x加法运算,这时候等价于LongAdder,当fn不为null时候则使用传递fn函数计算
else if (caseBase(v = base, ((fn = null) ? v + x : fn.applyAsLong(v, x))))
break;
}
总结:本节简单介绍了LongAccumluator的原理,LongAdder是LongAccumluator的一个特例,只是后者提供更加强大的功能,可以让用户自定义累加规则
总结
本章介绍了并发包中的原子性操作类,这些类都是使用非阻塞算法 CAS 实现的,这相比使用锁实现原子性操作在性能上有很大提高。首先讲解了最简单的AtomicLong 类的实现原理,然后讲解了JDK 8中新增的 LongAdder 类和 LongAccumulator 类的原理。学习完本章后,希望读者在实际项目环境中能因地制宜地使用原子性操作类来提升系统性能。