【并发】深度解析CAS原理与底层源码
什么是 CAS?
CAS全称是(Compare And Swap,比较并交换),通常指的是这样一种原子操作(针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同,就给它赋一个新值)
我们用伪代码描述起来就是:
if(value == expectValue) {
value = newValue;
}
以上伪代码描述了一个由比较和赋值两阶段组成的复合操作。
而 CAS 可以看作是它们合并后的整体!一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。
CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,我们稍后要讲的Java原子类中的递增操作(i++)就通过CAS自旋实现的。
CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步!
CAS案例分析
我们先来看看CAS是怎么解决多线程并发问题的!
一、不采用任何限制
这里要求开10个线程,每个线程执行1万次的“++操作”
public class Test {
private volatile static int sum = 0;
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
sum++;
}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 30421
System.out.println(sum);
}
}
由于我们这里没有使用任务“限制”,线程之间没有同步关系,最后输出的结果肯定不是10万!
二、使用synchronized锁解决线程安全问题
public class Test {
private volatile static int sum = 0;
static Object object = "";
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
synchronized (object) {
for (int j = 0; j < 10000; j++) {
sum++;
}
}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 100000
System.out.println(sum);
}
}
显然,加锁之后,结果就是我们预计的那样,这种方案可以解决大多数场景。
虽然, synchronized 在JDK1.8的时候优化了很多,但是由于Java线程是与操作系统线程一一对应的。Java的线程的切换,需要涉及到操作系统用户态到内核态的切换,是一个很重的操作!!!而synchronized的使用就会导致这种上下文切换!!!
三、使用CAS解决线程安全问题
在 Java 中,CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操做。
(1)Unsafe的基本使用
Java中的Unsafe的介绍与使用_面向鸿蒙编程的博客-CSDN博客https://blog.csdn.net/weixin_43715214/article/details/128260404这篇JavaGuide文章写得很好,可以看看
Java 魔法类 Unsafe 详解 (javaguide.cn)https://javaguide.cn/java/basis/unsafe.html#unsafe-%E5%8A%9F%E8%83%BD不能通过new的方式去创建一个Unsafe对象,必须要通过反射。
(2)自定义的UnsafeFactory
在项目开发中,如果要用到Unsafe,一般要通过自定义一个Unsafe工厂类,去创建Unsafe对象。
public class UnsafeFactory {
// 获取 Unsafe 对象
public static Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 获取字段的内存偏移量
public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
try {
return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
}
我们可以看 Unsafe类 的源码,分别定义了Object、int、long类型的比较与交换
它们都是 native 方法,由 Java 虚拟机 提供具体实现,这意味着不同的 Java 虚拟机对它们的实现可能会略有不同。
我们可以先研究一下这三个API
(3)Unsafe的关于CAS的API
public class CASTest {
public static void main(String[] args) {
Entity entity = new Entity();
Unsafe unsafe = UnsafeFactory.getUnsafe();
long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
// 12
System.out.println(offset);
boolean successful;
// 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段更新值
// true 3
successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
System.out.println(successful + "\t" + entity.x);
// true 5
successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
System.out.println(successful + "\t" + entity.x);
// false 5
successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
System.out.println(successful + "\t" + entity.x);
}
}
class Entity{
int x;
}
compareAndSwapInt(Object var1, long var2, int var4, int var5);
上述的参数分别是:对象实例、字段内存偏移量、字段期望值、字段更新值
针对 entity.x 的 3 次 CAS 操作,分别试图将它从 0 改成 3、从 3 改成 5、从 3 改成 8。
其中前两次是成功的!最后一次失败了,返回false,因为字段的值已经在上一次CAS中更新为5,但是此时的期望值为3,不符,所以返回false。
偏移量怎么算来的???
Java中的对象在内存中是怎么去存储的?
以Entry为例,对象头占8个字节,指针占4个字节,假如对象中没有定义属性(空的),那么操作系统会再为其分配一个对其填充为4个字节,所以,换句话说,new一个对象最少需要16个字节!
那这个例子中,Entry里面有一个变量x,所以x的偏移量就是12
再举一个例子,如下:
class Entry {
int x;
int y;
}
x的偏移量为12,y为16
(4)自定义一个CASLock锁工具类
// 工具类
public class CASLock {
//加锁标记
private volatile int state;
private static final Unsafe UNSAFE;
private static final long OFFSET;
static {
try {
UNSAFE = UnsafeFactory.getUnsafe();
// 偏移量
OFFSET = UnsafeFactory.getFieldOffset(
UNSAFE, CASLock.class, "state");
} catch (Exception e) {
throw new Error(e);
}
}
// CAS操作,值为0,则更新为1
public boolean cas() {
return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1);
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
}
(5)CAS实现线程同步
public class Test {
private volatile static int sum = 0;
static CASLock casLock = new CASLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for(;;){
if(casLock.getState() == 0 && casLock.cas()) {
try {
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
// 改回0
casLock.setState(0);
}
break;
}
}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 100000
System.out.println(sum);
}
}
这里开了10个线程,同一时刻,只会有一个线程进入到如下逻辑里面
if(casLock.getState() == 0 && casLock.cas()) { ... }
其它的线程都会在此处“自旋”!
因为,当第一个线程进去后,它进行了cas操作,会将state值改为1,所以它们get到的state值都是1,不满足这里的 casLock.getState() == 0 条件。
故,这样也就实现了“无锁”的线程变量同步,避免了上下文切换,线程由用户态到内核态的切换。
四、其它方式
当然了,实现这个小案例的方式有很多。
我们这里只是为了讲CAS原理和源码,才会采用这种方式。像AtomicInteger、Reentrantlock也都可以解决这个问题!
CAS源码分析
Hotspot 虚拟机对compareAndSwapInt 方法的实现如下:
/** unsafe.cpp
* obj:对象
* offset:对象偏移量
* e:比较值
* x:更新值
*/
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 根据偏移量,计算value的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值 e:要比较的值
// cas成功,返回期望值e,等于e,此方法返回true
// cas失败,返回内存中的value值,不等于e,此方法返回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
核心逻辑在Atomic::cmpxchg方法中,这个根据不同操作系统和不同CPU会有不同的实现。
这里我们以 linux_64x 的为例
#atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
//判断当前执行环境是否为多处理器环境
int mp = os::is_MP();
//LOCK_IF_MP(%4) 在多处理器环境下,为 cmpxchgl 指令添加 lock 前缀,以达到内存屏障的效果
//cmpxchgl 指令是包含在 x86 架构及 IA-64 架构中的一个原子条件指令,
//它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等,
//如果相等,则双向交换 dest 与 exchange_value,否则就单方面地将 dest 指向的内存值交给exchange_value。
//这条指令完成了整个 CAS 操作,因此它也被称为 CAS 指令。
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
里面的cmpxchgl指令,就是linux_64x 架构下的比较与交换
首先,输入是"r" (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表示compare_value存入eax寄存器,而exchange_value、dest、mp的值存入任意的通用寄存器。嵌入式汇编规定把输出和输入寄存器按统一顺序编号,顺序是从输出寄存器序列从左到右从上到下以“%0”开始,分别记为%0、%1···%9。也就是说,输出的eax是%0,输入的exchange_value、compare_value、dest、mp分别是%1、%2、%3、%4。
因此,cmpxchg %1,(%3)实际上表示cmpxchg exchange_value,(dest)
需要注意的是cmpxchg有个隐含操作数eax,其实际过程是先比较eax的值(也就是compare_value)和dest地址所存的值是否相等,
输出是"=a" (exchange_value),表示把eax中存的值写入exchange_value变量中。
Atomic::cmpxchg这个函数最终返回值是exchange_value,也就是说,如果cmpxchgl执行时compare_value和dest指针指向内存值相等则会使得dest指针指向内存值变成exchange_value,最终eax存的compare_value赋值给了exchange_value变量,即函数最终返回的值是原先的compare_value。此时Unsafe_CompareAndSwapInt的返回值(jint)(Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。如果cmpxchgl执行时compare_value和(dest)不等则会把当前dest指针指向内存的值写入eax,最终输出时赋值给exchange_value变量作为返回值,导致(jint)(Atomic::cmpxchg(x, addr, e)) == e得到false,表明CAS失败。
现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。
不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。
CAS的缺点
CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方面:
- 自旋 CAS 长时间地不成功,则会给 CPU 带来非常大的开销
- 只能保证一个共享变量原子操作
- ABA 问题