无锁三大将:CAS & Unsafe & Atomic
文章目录
- 无锁三大将:CAS & Unsafe & Atomic
- 一:CAS机制
- 二:Unsafe魔法指针类
- 2.1:内存管理
- 2.2:对象创建实例
- 2.3:类,实例对象以及变量操作
- 2.4:数组操作
- 2.5:CAS相关操作
- 2.6:线程操作
- 2.7:内存屏障
- 三:Atomic原子包
- 3.1:基本类型原子操作类
- 3.2:引用类型原子操作类
- 3.3:数组原子引用类
- 3.4:属性更新原子操作类
- 3.5:LongAdder和AtomicLong
- 3.6:ABA问题和原子标记引用
- 3.6.1:ABA问题
- 3.6.2:ABA问题的解决方案
一:CAS机制
当一个线程想要执行被synchronized修饰的代码/方法,为了避免操作共享资源时发生冲突,每次都需要执行加锁策略,而无锁则总是假设对共享资源的访问没有冲突,线程可以不停执行,无需加锁,无需等待
一旦发现冲突,无锁策略将采用一种称为CAS的技术来保证线程执行的安全性,也就是说CAS技术就是无锁策略实现的关键。
CAS全称Compare And Swap(比较并交换),而Java中的CAS实现最终也是依赖于CPU的原子性指令实现
在CAS机制中核心思想为:CAS(V, E, N) -> V:需要操作的共享变量; E:预期值;N:新值
工作过程如下:
由于CAS操作属于乐观派,每次线程操作时都认为自己可以成功执行,当多个线程同时使用CAS操作一个变量时,只有一个会成功执行并成功更新,其余均会失败
但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作
基于这样的原理,CAS机制即使没有锁,同样也能够得知其他线程对共享资源进行了操作并执行相应的处理措施
同时,因为CAS无锁,所以天然避免了死锁问题
而且因为CAS依赖于CPU的原子性指令实现,所以不会出现多个线程在同时做CAS操作时的安全问题而造成不一致,因为原子性指令无法被中断
二:Unsafe魔法指针类
Unsafe类位于sun.misc包中,提供的功能十分强大,但是确实存在些许不安全【这就是为什么叫unsafe的原因】
Unsafe内部方法操作可以像C的指针一样直接操作内存,而能直接操作内存时也就凸显出此类的不安全性,意味着:
- 不受JVM管理,也就代表着无法被GC,需要我们手动释放内存,当你使用这个类做了一些操作稍有不慎就会出现内存泄漏
- Unsafe类中的不少方法中必须提供原始地址(内存地址)和被替换对象的地址,偏移量要自己计算,一旦出现问题就是JVM崩溃级别的错误,会导致整个Java程序崩溃,表现为应用进程直接crash掉
但是通过Unsafe类直接操作内存,也意味着其速度会比普通Java程序更快,在高并发的条件之下能够很好地提高效率
因此,从上面几个角度来看,虽然在一定程度上提升了效率但是也带来了指针的不安全性,Unsafe名副其实。
所以我们在编写程序时如果没有什么特殊要求不应该考虑使用它,并且Java官方也不推荐使用
Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务
关于Unsafe类的主要功能点如下:
- 类(Class)相关:提供Class和它的静态域操纵方法
- 信息(Info)相关:返回某些低级别的内存信息
- 数组(Arrays)相关:提供数组操纵方法
- 对象(Objects)相关:提供Object和它的域操纵方法
- 内存(Memory)相关:提供直接内存访问方法(绕过JVM堆直接操纵本地内存)
- 同步(Synchronization)相关:提供低级别同步原语、线程挂起/放下等操纵方法
只有由主类加载器加载的类才能调用这个方法
public static Unsafe getUnsafe() {
// 得到调用的加载器
Class localClass = Reflection.getCallerClass();
// 如果不是主类加载器加载的类,将抛出异常
if(!VM.isSystemDomainLoader(localClass.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
// 可以调用
return theUnsafe;
}
}
🎉但通过万能的反射,还是可以使用到Unsafe类的:
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
2.1:内存管理
//分配内存指定大小的内存
public native long allocateMemory(long bytes);
//根据给定的内存地址address设置重新分配指定大小的内存
public native long reallocateMemory(long address, long bytes);
//用于释放allocateMemory和reallocateMemory申请的内存
public native void freeMemory(long address);
//将指定对象的给定offset偏移量内存块中的所有字节设置为固定值
public native void setMemory(Object o, long offset, long bytes, byte value);
//设置给定内存地址的值
public native void putAddress(long address, long x);
//获取指定内存地址的值
public native long getAddress(long address);
//设置给定内存地址的long值
public native void putLong(long address, long x);
//获取指定内存地址的long值
public native long getLong(long address);
//设置或获取指定内存的byte值
public native byte getByte(long address);
public native void putByte(long address, byte x);
//其他基本数据类型(long,char,float,double,short等)的操作与putByte及getByte相同
.......... 省略代码
//操作系统的内存页大小
public native int pageSize();
2.2:对象创建实例
在之前我们创建类对象实例时无非通过两种形式:new以及反射机制创建
但是无论是new还是反射的形式创建都会调用对象的构造方法来完成对象的初始化,而Unsafe类提供创建对象实例新的途径如下:
//传入一个对象的class并创建该实例对象,但不会调用构造方法
public native Object allocateInstance(Class cls) throws InstantiationException;
2.3:类,实例对象以及变量操作
//获取字段f在实例对象中的偏移量
public native long objectFieldOffset(Field f);
//静态属性的偏移量,用于在对应的Class对象中读写静态属性
public native long staticFieldOffset(Field f);
//返回值就是f.getDeclaringClass()
public native Object staticFieldBase(Field f);
//获得给定对象偏移量上的int值,所谓的偏移量可以简单理解为指针指向该变量的内存地址,
//通过偏移量便可得到该对象的变量,进行各种操作
public native int getInt(Object o, long offset);
//设置给定对象上偏移量的int值
public native void putInt(Object o, long offset, int x);
//获得给定对象偏移量上的引用类型的值
public native Object getObject(Object o, long offset);
//设置给定对象偏移量上的引用类型的值
public native void putObject(Object o, long offset, Object x);
//其他基本数据类型(long,char,byte,float,double)的操作与getInthe及putInt相同
//设置给定对象的int值,使用volatile语义,即设置后立马更新到内存对其他线程可见
public native void putIntVolatile(Object o, long offset, int x);
//获得给定对象的指定偏移量offset的int值,使用volatile语义,总能获取到最新的int值。
public native int getIntVolatile(Object o, long offset);
//其他基本数据类型(long,char,byte,float,double)的操作与putIntVolatile
//及getIntVolatile相同,引用类型putObjectVolatile也一样。
..........省略代码
//与putIntVolatile一样,但要求被操作字段必须有volatile修饰
public native void putOrderedInt(Object o,long offset,int x);
2.4:数组操作
//获取数组第一个元素的偏移地址
public native int arrayBaseOffset(Class arrayClass);
//数组中一个元素占据的内存空间,arrayBaseOffset与arrayIndexScale配合使用,可定位数组中每个元素在内存中的位置
public native int arrayIndexScale(Class arrayClass);
2.5:CAS相关操作
//第一个参数o为给定对象,offset为对象内存的偏移量,通过这个偏移量迅速定位字段并设置或获取该字段的值,
//expected表示期望值,x表示要设置的值,下面3个方法都通过CAS原子指令执行操作。
public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);
public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
// =============== JDK1.8新增的 ====================
//1.8新增,给定对象o,根据获取内存偏移量指向的字段,将其增加delta,
//这是一个CAS操作过程,直到设置成功方能退出循环,返回旧值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
//获取内存中最新值
v = getIntVolatile(o, offset);
//通过CAS操作
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
//1.8新增,方法作用同上,只不过这里操作的long类型数据
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
}
//1.8新增,给定对象o,根据获取内存偏移量对于字段,将其 设置为新值newValue,
//这是一个CAS操作过程,直到设置成功方能退出循环,返回旧值
public final int getAndSetInt(Object o, long offset, int newValue) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, newValue));
return v;
}
// 1.8新增,同上,操作的是long类型
public final long getAndSetLong(Object o, long offset, long newValue) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, newValue));
return v;
}
//1.8新增,同上,操作的是引用类型数据
public final Object getAndSetObject(Object o, long offset, Object newValue) {
Object v;
do {
v = getObjectVolatile(o, offset);
} while (!compareAndSwapObject(o, offset, v, newValue));
return v;
}
2.6:线程操作
将一个线程进行挂起是通过park方法实现的,调用park后,线程将一直阻塞直到超时或者中断等条件出现。
unpark可以终止一个挂起的线程,使其恢复正常。
Java对线程的挂起操作被封装在LockSupport类中
LockSupport类中有各种版本pack方法,其底层实现最终还是使用Unsafe.park()方法和Unsafe.unpark()方法来实现。
//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。
public native void park(boolean isAbsolute, long time);
//终止挂起的线程,恢复正常.java.util.concurrent包中挂起操作都是在LockSupport类实现的,其底层正是使用这两个方法,
public native void unpark(Object thread);
2.7:内存屏障
//在该方法之前的所有读操作,一定在load屏障之前执行完成
public native void loadFence();
//在该方法之前的所有写操作,一定在store屏障之前执行完成
public native void storeFence();
//在该方法之前的所有读写操作,一定在full屏障之前执行完成,这个内存屏障相当于上面两个的合体功能
public native void fullFence();
三:Atomic原子包
JDK5之后推出的JUC并发包中提供了java.util.concurrent.atomic原子包
在该包下提供了大量基于CAS实现的原子操作类,如以后不想对程序代码加锁但仍然想避免线程安全问题,那么便可以使用该包下提供的类
原子包提供的操作类主要可分为如下四种类型:
- 基本类型原子操作类
- 引用类型原子操作类
- 数组类型原子操作类
- 属性更新原子操作类
3.1:基本类型原子操作类
Atomic包中提供的对于基本类型的原子操作类分别为AtomicInteger、AtomicBoolean、AtomicLong三个
它们的底层实现方式及使用方式是一致的,这里以AtomicInteger为例:
AtomicInteger主要是针对int类型的数据执行原子操作,它提供了原子自增方法、原子自减方法以及原子赋值方法等API
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// 获取指针类Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
//变量value在AtomicInteger实例对象内的内存偏移量
private static final long valueOffset;
static {
try {
//通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移
//通过该偏移量valueOffset,unsafe类的内部方法可以获取到变量value对其进行取值或赋值操作
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
//当前AtomicInteger封装的int变量值 value
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
//获取当前最新值,
public final int get() {
return value;
}
//设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全
public final void set(int newValue) {
value = newValue;
}
//最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
//设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//如果当前值为expect,则设置为update(当前值指的是value变量)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//当前值自增加1,返回旧值,底层CAS操作
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//当前值自减扣1,返回旧值,底层CAS操作
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
//当前值增加delta,返回旧值,底层CAS操作
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
//当前值自增加1并返回自增后的新值,底层CAS操作
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//当前值自减扣1并返回自减之后的新值,底层CAS操作
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
//当前值增加delta,返回新值,底层CAS操作
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
//省略一些不常用的方法....
}
可以得知,AtomicInteger原子类的所有方法中并没有使用到任何互斥锁机制来实现同步,而是通过我们前面介绍的Unsafe类提供的CAS操作来保障的线程安全,这里以最常见的incrementAndGet为例:
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// unsafe.getAndAddInt()方法如下:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
可看出getAndAddInt通过一个do while死循环不断的重试更新要设置的值,直到成功为止
调用的是Unsafe类中的compareAndSwapInt方法,是一个CAS操作方法
// 这里举一个小例子
public class AtomicIntegerDemo {
// 创建共享变量 atomicI
static AtomicInteger atomicI = new AtomicInteger();
public static class AddThread implements Runnable{
public void run(){
for(int i = 0; i < 10000; i++)
atomicI.incrementAndGet();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
//开启5条线程同时执行atomicI的自增操作
for(int i = 1; i <= 5; i++){
threads[i]=new Thread(new AddThread());
}
//启动线程
for(int i = 1; i <= 5; i++){threads[i].start();}
for(int i = 1; i <= 5; i++){threads[i].join();}
System.out.println(atomicI);
}
}
//输出结果:50000
3.2:引用类型原子操作类
引用类型的原子操作类主要分析AtomicReference类,其他的原理及使用都是一致的
AtomicReference与我们前面分析的AtomicInteger实现的原理大致相同,最终都是通过Unsafe类中提供的CAS操作来实现的
关于AtomicReference的其他方法实现原理也大致相同,只不过Java8为AtomicReference新增了几个API:
- getAndUpdate(UnaryOperator)
- updateAndGet(UnaryOperator)
- getAndAccumulate(V,AnaryOperator)
- accumulateAndGet(V,AnaryOperator)
上述的方法几乎存在于所有的原子类中,而这些方法可以对期望值或要更新的值进行额外修改后再执行CAS更新。
public class AtomicReference<V> implements java.io.Serializable {
// 得到unsafe对象实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 定义值偏移量
private static final long valueOffset;
static {
try {
/* 静态代码块在类加载时为偏移量赋值,通过unsafe类提供的得到类属性的
地址API得到当前类定义的属性value的地址 */
valueOffset = unsafe.objectFieldOffset(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
//内部变量value,Unsafe类通过valueOffset内存偏移量即可获取该变量
private volatile V value;
/*
原子替换方法,间接调用Unsafe类的compareAndSwapObject(),
它是一个实现了CAS操作的本地(native)方法
*/
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
//设置并获取旧值
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
//省略代码......
}
//Unsafe类中的getAndSetObject方法,实际调用还是CAS操作
public final Object getAndSetObject(Object o, long offset, Object newValue) {
Object v;
do {
v = getObjectVolatile(o, offset);
} while (!compareAndSwapObject(o, offset, v, newValue));
return v;
}
举一个小例子
package com.cui.commonboot.myjuc;
import lombok.AllArgsConstructor;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
* 功能描述:原子引用类测试
* </p>
*
* @author cui haida
* @date 2023/12/26/20:44
*/
public class Test06 {
@AllArgsConstructor
@ToString
static class Student {
private int id;
public String name;
public String getName() {
return name;
}
}
// 创建一个学生的原子引用
public static AtomicReference<Student> atomicStudentRef = new AtomicReference<>();
public static void main(String[] args) {
Student s1 = new Student(1, "张三");
atomicStudentRef.set(s1); // 将s1放入原子引用中
Student s2 = new Student(2, "李四");
atomicStudentRef.compareAndSet(s1, s2); // 如果当前原子引用是s1,设置成为s2, (cas)
System.out.println(atomicStudentRef.get().toString());
}
}
3.3:数组原子引用类
指的就是利用原子的形式更新数组中的元素从而避免出现线程安全问题。JUC包中的数组类型原子操作类具体分为以下三个类:
- AtomicIntegerArray:原子更新整数数组里的元素
- AtomicLongArray:原子更新长整数数组里的元素
- AtomicReferenceArray:原子更新引用类型数组里的元素
这里还是以AtomicIntegerArray为例,其他两个原理都一样
public class AtomicIntegerArray implements java.io.Serializable {
//获取Unsafe实例对象
private static final Unsafe unsafe = Unsafe.getUnsafe();
//从前面我们分析Unsafe类得知arrayBaseOffset()作用:获取数组的第一个元素内存起始地址
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
//内部数组
private final int[] array;
static {
//获取数组中一个元素占据的内存空间
int scale = unsafe.arrayIndexScale(int[].class);
//判断是否为2的次幂,一般为2的次幂否则抛异常
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
//计算数组中每个元素的的内存地址
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
//省略代码......
}
arrayBaseOffset可以获取一个数组中的第一个元素内存起始位置
arrayIndexScale则可以得知数组中指定下标元素的内存占用空间大小(int -> 4Byte(字节),所以scale的值为4。
所以:数组每个元素起始内存位置 = 数组第一个元素起始位置 + 数组元素下标 * 数组中每个元素占用的内存空间大小
byteOffset(int)方法可以根据数组下标计算出每个元素的内存地址。
而AtomicIntegerArray中的其他方法都是间接调用Unsafe类的CAS原子操作方法实现
demo
public class AtomicIntegerArrayDemo {
// 创建一个大小为5的原子数组
static AtomicIntegerArray atomicArr = new AtomicIntegerArray(5);
public static class incrementTask implements Runnable{
public void run(){
// 执行数组中元素自增操作,参数为index,即数组下标
for(int i = 0; i < 10000; i++) {
atomicArr.getAndIncrement(i % atomicArr.length());
}
}
}
public static void main(String[] args) throws InterruptedException {
// 开五个线程同时跑
Thread[] threads = new Thread[5];
for(int i = 0; i < 5;i++) threads[i] = new Thread(new incrementTask());
for(int i = 0; i < 5;i++) threads[i].start();
for(int i = 0; i < 5;i++) threads[i].join();
System.out.println(atomicArr);
/* 执行结果:
[10000, 10000, 10000, 10000, 10000]
*/
}
}
几个常用方法
//执行自增操作,返回旧值,入参i是index即数组元素下标
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
//指定下标元素执行自增操作,并返回新值
public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}
//指定下标元素执行自减操作,并返回新值
public final int decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}
//间接调用unsafe.getAndAddInt()方法
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
//Unsafe类中的getAndAddInt方法,执行CAS操作
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
3.4:属性更新原子操作类
如果我们只需要一个类的某个字段(类属性)也变为原子操作即让某个普通变量也变为原子操作,可以使用属性更新原子操作类
JUC中提供了三个类操作更新:
- AtomicIntegerFieldUpdater:更新整型的属性的原子操作类
- AtomicLongFieldUpdater:更新长整型属性的原子操作类
- AtomicReferenceFieldUpdater:更新引用类型中属性的原子操作类
不过使用原子更新类的条件是比较苛刻的,如下:
- 操作的字段不能被static修饰
- 操作的字段不能被final修饰,因为常量无法修改
- 操作的字段必须被volatile修饰保证可见性,也就是需要保证数据的读取是线程可见的
- 属性必须对当前的Updater所在的区域是可见的
- 如果不是当前类内部进行原子更新器操作不能使用private,protected修饰符。
- 子类操作父类时修饰符必须是protected权限及以上
- 如果在同一个package下则必须是default权限及以上
- 也就是说无论何时都应该保证操作类与被操作类间的可见性。
原理比较复杂,总之就是反射 + Unsafe.cas那一套,这里只写一个demo演示下怎么用
package com.cui.commonboot.myjuc;
import lombok.AllArgsConstructor;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* <p>
* 功能描述:
* </p>
*
* @author cui haida
* @date 2023/12/26/21:05
*/
public class AtomicIntegerFieldUpdaterDemo {
public static class Course {
int courseId;
String courseName;
volatile int courseScore; // 注意这个属性
}
@AllArgsConstructor
@ToString
public static class Student {
int studentId;
volatile String studentName; // 注意这个属性
}
// 定义更新整型的属性的原子操作类,目标属性:Course.courseScore
static AtomicIntegerFieldUpdater<Course> courseAIFU =
AtomicIntegerFieldUpdater.newUpdater(Course.class, "courseScore");
// 定义更新引用类型中属性的原子操作类,目标属性:Student.studentName
static AtomicReferenceFieldUpdater<Student, String> studentARFU =
AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "studentName");
// 定义原子计数器效验数据准确性
public static AtomicInteger courseScoreCount = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
final Course course = new Course();
Thread[] threads = new Thread[1000];
for (int i = 0; i < 1000; i++) {
threads[i] = new Thread(() -> {
if (Math.random() > 0.6) {
courseAIFU.incrementAndGet(course);
courseScoreCount.incrementAndGet();
}
});
threads[i].start();
}
for (int i = 0; i < 1000; i++) threads[i].join();
System.out.println("Course.courseScore:" + course.courseScore); // 395
System.out.println("数据效验结果:" + courseScoreCount.get()); // 395
// 更新引用类型中属性的原子操作类的demo
Student student = new Student(1, "张三");
studentARFU.compareAndSet(student, student.studentName, "李四");
System.out.println(student); // AtomicIntegerFieldUpdaterDemo.Student(studentId=1, studentName=李四)
}
}
3.5:LongAdder和AtomicLong
LongAdder是JDK1.8由Doug Lea大神新增的原子操作类,位于java.util.concurrent.atomic包下
LongAdder在高并发的场景下会比AtomicLong 具有更好的性能,代价是消耗更多的内存空间
在AtomicLong中,使用的是Unsafe.getAndAddLong()来进行自增,而这个方法是通过CAS自旋得到的
所以在高并发情况下,当有大量线程同时去更新一个变量,任意一个时间点只有一个线程能够成功,绝大部分的线程在尝试更新失败后,会通过自旋的方式再次进行尝试,这样严重占用了CPU的时间片,进而导致系统性能问题。
LongAdder设计思想上,采用分段的方式降低并发冲突的概率。通过维护一个基准值base和 Cell 数组:
- 当没有出现多线程竞争的情况,线程会直接对base里面的value进行修改。
- 当多线程的时候,会执行如下操作:
- LongAdder会初始化一个cell数组,然后对每个线程获取对应的hash值
- 之后通过hash & (size -1)[size为cell数组的长度]将每个线程定位到对应的cell单元格
- 之后这个线程将值写入对应的cell单元格中的value
- 之后将所有cell单元格的value和base中的value进行累加求和得到最终的值。
🎉 而且每个线程竞争的Cell的下标不是固定的,如果CAS失败,会重新获取新的下标去更新,从而极大地减少了CAS失败的概率。
和AtomicLong比较实例
package com.cui.commonboot.myjuc;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
/**
* <p>
* 功能描述:
* </p>
*
* @author cui haida
* @date 2023/12/26/21:25
*/
public class Test07 {
public static void main(String[] args) throws InterruptedException {
t1();
t2();
}
public static void t1() throws InterruptedException {
LongAdder adder = new LongAdder();
Thread[] threads = new Thread[10];
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100000000; j++) {
adder.add(1);
}
});
threads[i].start();
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
System.out.println("add longer time is: " + (System.currentTimeMillis() - startTime));
}
public static void t2() throws InterruptedException {
AtomicLong atomicLong = new AtomicLong(0);
Thread[] threads = new Thread[10];
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100000000; j++) {
atomicLong.getAndIncrement();
}
});
threads[i].start();
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
System.out.println("atomic long time is: " + (System.currentTimeMillis() - startTime));
}
}
测试数据较小的时候还是atomic快
package com.cui.commonboot.myjuc;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
/**
* <p>
* 功能描述:
* </p>
*
* @author cui haida
* @date 2023/12/26/21:25
*/
public class Test07 {
public static void main(String[] args) throws InterruptedException {
t1();
t2();
}
public static void t1() throws InterruptedException {
LongAdder adder = new LongAdder();
Thread[] threads = new Thread[10];
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
adder.add(1);
}
});
threads[i].start();
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
System.out.println("add longer time is: " + (System.currentTimeMillis() - startTime));
}
public static void t2() throws InterruptedException {
AtomicLong atomicLong = new AtomicLong(0);
Thread[] threads = new Thread[10];
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
atomicLong.getAndIncrement();
}
});
threads[i].start();
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
System.out.println("atomic long time is: " + (System.currentTimeMillis() - startTime));
}
}
3.6:ABA问题和原子标记引用
3.6.1:ABA问题
当第一个线程执行CAS(V,E,N)操作,在获取到当前变量V,准备修改为新值N前,另外两个线程已连续修改了两次变量V的值
使得该值又恢复为第一个线程看到的原有值,这样的话,我们就无法正确判断这个变量是否已被修改过
线程t1先看到共享变量的值为0,在准备更新成为1之前,t2,t3分别做了两次更新,最后又将共享变量的值更新成为了0
此时T1再回来进行更新的时候是无法得知这个值已经被其他线程更改过的,在做V==E判断时发现atomicI还是原先的值,就会对atomicI进行更改操作,但是此时的现场与T1线程第一次看到的值时的现场不同了
而上面这种情况一般情况下发生的几率很小,而且就算发生了一般业务也不会造成什么问题,就算出现了ABA问题也不会造成影响
但是有些情况下的ABA问题还是要注意一下的,例如下面是单向链表实现的堆栈stack:
- 如果你的应用场景中存在需要基于动态变化而要做出的操作,ABA问题的出现就需要解决
- 如果你的应用场景只停留在数据表面得到的结果而做的判断,那么ABA问题你就可以不用去关注。
3.6.2:ABA问题的解决方案
在我们通过其他形式去实现乐观锁时通常会通过version版本号来进行标记从而避免并发带来的问题
在Java中解决CAS的ABA问题主要有两种方案:
- AtomicStampedReference:时间戳控制,能够完全解决
- AtomicMarkableReference:维护boolean值控制,不能完全杜绝
AtomicStampedReference
AtomicStampedReference是一个带有时间戳的对象引用,内部通过包装Pair对象键值对的形式来存储数据与时间戳
在每次更新时,先对数据本身和时间戳进行比对,只有当两者都符合预期值时才调用Unsafe的compareAndSwapObject方法进行写入。
当然,AtomicStampedReference不仅会设置新值而且还会记录更改的时间戳。这也就解决了之前CAS机制带来的ABA问题。
public class AtomicStampedReference<V> {
// 通过Pair内部类存储数据和时间戳
private static class Pair<T> {
final T reference; // 泛型引用
final int stamp; // 时间戳
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
// 存储数值和时间的内部类
private volatile Pair<V> pair;
// 构造方法:初始化时需传入初始值和时间初始值
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
}
// =================== 一个小demo感受一下 =====================
public class ABAIssue {
// 定义原子计数器,初始值 = 100
private static AtomicInteger atomicI = new AtomicInteger(100);
// 定义AtomicStampedReference:初始化时需要传入一个初始值和初始时间
private static AtomicStampedReference<Integer> asRef = new AtomicStampedReference<Integer>(100, 0);
/**
* 未使用AtomicStampedReference线程组:TA TB
*/
private static Thread TA = new Thread(() -> {
System.err.println("未使用AtomicStampedReference线程组:[TA TB] >>>>");
// 更新值为101
boolean flag = atomicI.compareAndSet(100, 101);
System.out.println("线程TA:100 -> 101.... flag:" + flag + ",atomicINewValue:" + atomicI.get());
// 更新值为100
flag = atomicI.compareAndSet(101, 100);
System.out.println("线程TA:101 -> 100.... flag:" + flag + ",atomicINewValue:" + atomicI.get());
});
private static Thread TB = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean flag = atomicI.compareAndSet(100, 888);
System.out.println("线程TB:100 -> 888.... flag:" + flag + ",atomicINewValue:" + atomicI.get() + "\n\n");
});
/**
* 使用AtomicStampedReference线程组:T1 T2
*/
private static Thread T1 = new Thread(() -> {
System.err.println("使用AtomicStampedReference线程组:[T1 T2] >>>>");
// 更新值为101
boolean flag = asRef.compareAndSet(100, 101, asRef.getStamp(), asRef.getStamp() + 1);
System.out.println("线程T1:100 -> 101.... flag:" + flag + ".... asRefNewValue:" + asRef.getReference() + ".... 当前Time:" + asRef.getStamp());
// 更新值为100
flag = asRef.compareAndSet(101, 100, asRef.getStamp(), asRef.getStamp() + 1);
System.out.println("线程T1:101 -> 100.... flag:" + flag + ".... asRefNewValue:" + asRef.getReference() + ".... 当前Time:" + asRef.getStamp());
});
private static Thread T2 = new Thread(() -> {
int time = asRef.getStamp();
System.out.println("线程休眠前Time值:" + time);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean flag = asRef.compareAndSet(100, 888, time, time + 1);
System.out.println("线程T2:100 -> 888.... flag:" + flag + ".... asRefNewValue:" + asRef.getReference() + ".... 当前Time:" + asRef.getStamp());
});
public static void main(String[] args) throws InterruptedException {
TA.start();
TB.start();
TA.join();
TB.join();
T1.start();
T2.start();
}
}
/**
* 未使用AtomicStampedReference线程组:[TA TB] >>>>
* 线程TA:100 -> 101.... flag:true,atomicINewValue:101
* 线程TA:101 -> 100.... flag:true,atomicINewValue:100
* 线程TB:100 -> 888.... flag:true,atomicINewValue:888
*
*
* 使用AtomicStampedReference线程组:[T1 T2] >>>>
* 线程休眠前Time值:0
* 线程T1:100 -> 101.... flag:true.... asRefNewValue:101.... 当前Time:1
* 线程T1:101 -> 100.... flag:true.... asRefNewValue:100.... 当前Time:2
* 线程T2:100 -> 888.... flag:false.... asRefNewValue:100.... 当前Time:2
*/
AtomicMarkableReference
AtomicMarkableReference只能在一定程度上减少ABA问题的出现,它并不能完全的杜绝ABA问题。
因为AtomicMarkableReference内部维护的是boolean类型的标识,只会在true与false两种状态之间来回切换,所以还是存在ABA问题出现的概念。