上一篇悲观锁中,我们讲到悲观锁的四种状态时,提到了“无锁”这种状态,并解释其有两种语义,一种是对共享资源不进行任何同步原语保护;另一种是共享资源会出现被竞争的情况,但是不使用操作系统同步进行保护,而是使用诸如CAS这种方式进行线程同步,这样能够尽量将获取锁释放锁的操作仅在用户空间内完成,大幅度减少操作系统用户态和内核态之间的切换次数,在很多情况下就能够提升程序性能,这也被称为“乐观锁”。
乐观与悲观
假设现在有多个线程想要操作同一个资源对象,很多人的第一反应就是使用互斥锁。但互斥锁的同步方式是悲观的,什么是“悲观”呢?简单来说,就是操作系统将会悲观地认为,如果不严格同步线程调用,那么一定会产生异常,所以互斥锁将会锁定资源,只供一个线程调用,而阻塞其他线程,让其他线程等待,因此,这种同步机制也叫做“悲观锁”。
但悲观锁不是在所有情况下都适用,比如在一些情况下,同步代码块执行的耗时远远小于线程切换的耗时,这样就很不划算。程序员们可能更加希望一些场景下,能够在用户态中对线程的切换进行管理,这样效率更高。所以,我们不想让操作系统那么“悲观”,每次都使用同步原语对共享资源进行锁定,而是希望让线程反复“乐观”地去尝试获取共享资源,如果发现空闲,那么使用,如果被占用,那么继续“乐观”地重试。
CAS
CAS (Compare And Swap)可以简单翻译为:比较然后交换。很多人都听说过CAS,但是对于它究竟是如何工作的?需要哪些外部支持?如何应用到业务中?可能并不是很了解,下面我就通过一个通俗的例子来进行介绍。
我们现在假设有一间房间,房门上挂着一块牌子,正面是0,反面是1,这块牌子代表房间是否被占用的状态。当显示0的时候,房间为空,谁都可以进入,当显示1时,则代表有人正在使用。在上面这个比喻里,房间就是共享资源,号码牌就是一把乐观锁,人就是线程。
假设此时A和B这两条线程都看到了牌子上显示的是0,于是争抢着去使用房间。但是A线程抢先获得了时间片,他第一个冲进房间并将这块牌子的状态改为1,此时B线程才冲过来,但是发现牌子上的状态已经被改为1,不过B线程没有放弃,不断回来看看牌子是否变回0。
这样,你应该就能够很容易地理解CAS,当共享资源的状态值为0的一瞬间,A、B线程读到了。此时这两条线程认为共享资源当前空闲未被占用,于是它们各自将会生成两个值。
1. old value,代表之前读到的资源对象的状态值
2. new value,代表想要将资源对象的状态值更新后的值
这里对AB线程来说,old value都是0,new value都是1。
此时AB线程争抢着去修改资源对象的状态值,然后占用它。假设A线程运气比较好,率先获得时间片时,他将old value与资源对象的状态值进行compare,发现一致,于是将牌子上的值swap为new value。而线程B没有那么幸运,它落后了一步,此时资源对象的状态值已经被A线程修改成了1,所以B线程在compare的时候,发现和自己预期的old value不一致,所以放弃swap操作。
但在实际应用中,我们不会让B线程就这么放弃,通常会使其自旋,自旋就是使其不断重试cas操作,通常会配置自旋次数来防止死循环。
下面我们用代码来展示一下CAS函数,非常简单,相信你一眼就能看懂。
int cas(long *addr, long oldValue, long newValue)
if(* addr != old)
return 0;
*addr = new;
return 1;
}
此时,细心的小伙伴就会发现,通过上面的描述,关于通过CAS来独占资源的设计似乎并不完善,存在一个严重漏洞!
因为看上去这个CAS函数本身没有进行任何同步措施,似乎还是存在线程不安全的问题。比如A线程看到牌子的状态是0,伸手去翻的一瞬间,很有可能B线程突然抢到时间片,将牌子翻成了1,但是线程A不知情,也将牌子翻到了1,这就出现了线程安全问题,AB线程同时获得了资源,好比两个人进入了更衣室,非常尴尬。
这么看来,一个有待解决的问题是,“比较数值是否一致并且修改数值”的这个动作,必须要么成功要么失败,不能存在中间状态,换句话说,CAS操作必须是原子性的。只有基于这个真理,我们前面的所有设想才能成立。
那么,如何实现CAS的原子性呢?所幸的是,各种不同架构的CPU都提供了指令级的CAS原子操作,比如在x86架构下,通过cmpxchg指令支持CAS,在ARM下,通过LL/SC来实现CAS。也就是说,既然CPU已经原生地支持了CAS,那么上层进行调用即可。现在,除了通过操作系统的同步原语(比如mutex)来有锁地实现线程同步(悲观),通过CAS的方式我们能实现另一种无锁的同步机制(乐观)。
这些通过CAS来实现同步的工具,由于不会锁定资源,而且当线程需要修改共享资源对象时,总是会乐观地认为对象状态值没有被其他线程修改过,自己主动尝试去Compare And Set状态值,相较于上文提到的“悲观锁”,这种同步机制被称作“乐观锁”。
Java中的乐观锁编程
下面,我们就回到Java,来谈一谈在Java中,是如何利用CAS特性来进行乐观锁编程的。
我们了解了CAS,知道了它是由底层指令架构支持的,那么上层如何封装调用,我们如何将CAS应用到我们的代码中?很多同学对它的认知可能还是模糊的,下面我们就以一个简单的实际的例子,来加深对CAS及其应用的理解。
假设有一个简单的需求,你需要使用5条线程,将一个值,从0累加到800,你该怎么做?
首先我写一种错误的写法,不使用任何同步操作,那么一定会出现线程安全问题。
1 public static Integer num = 0;
2
3 public static void main(String[] args) throws InterruptedException {
4 for (int i = 0; i <5 ; i++ ) {
5 Thread t = new Thread(new Runnable() {
6 @Override
7 public void run() {
8 while (num < 800) {
9 num++;
10 System.out.println(Thread.currentThread().getName() +
":" + num);
11 }
12 }
13 });
14 t.start();
15 }
16 }
如何使用乐观锁实现呢?非常简单。我们要善用轮子。写过Java的同学应该都知道AtomicInteger这个类,它的底层通过CAS来实现了同步的计数器。我们可以将代码改成这样:
1 static AtomicInteger num = new AtomicInteger(0);
2
3 public static void main(String[] args) throws InterruptedException {
4 for (int i = 0; i <5 ; i++ ) {
5 Thread t = new Thread(new Runnable() {
6 @Override
7 public void run() {
8 while (num.get() < 800) {
9 num++;
10 System.out.println(Thread.currentThread().getName() +
":" + num);
11 }
12 }
13 });
14 t.start();
15 }
16 }
当然,写这段代码,实现这个功能不是我们的目的。我们需要关注的是AtomicInteger底层是如何通过CAS来做到无锁同步的。
AtomicInteger这个类的内容不多,主要的成员变量就是一个Unsafe类型的实例和一个Long类型的offset,这边注释也开门]见山,告诉我们使用Unsafe的CAS操作来对值进行更新。我们看incrementAndGet方法,可以看到直接调用了Unsafe对象的getAndAddInt方法,进一步点进去,可以看到确实就是调用了Unsafe的compareAndSwaplnt方法(CAS)。这里出现了一个循环,实际上这就是我之前提及的“自旋”。
我们可以看到compareAndSwap lnt()方法存在native修饰符,那么说明这是一个本地方法,和具体的平台实现相关。如果你的CPU是x86架构,那么事实上这个本地方法将会调用系统的cmpxchg指令。我们可以在openjdk源码中的hotspot/src/share/ vm/ pr ims /unsafe.cpp和hotspot/src/share/ vm/ runtime/Atomic.cpp路径下找到,这些本地方法是c++写的。
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte*dest, jbyte
compare_value) {
assert (sizeof(jbyte) == 1,"assumption.");
uintptr_t dest_addr = (uintptr_t) dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint*dest_int = ( volatile jint*)(dest_addr - offset);
// 对象当前值
jint cur = *dest_int;
// 当前值cur的地址
jbyte * cur_as_bytes = (jbyte * ) ( & cur);
// new_val地址
jint new_val = cur;
jbyte * new_val_as_bytes = (jbyte *) ( & new_val);
// new_val 存 exchange_value,后面修改则直接从 new_val 中取值
new_val_as_bytes[offset] = exchange_value;
// 比较当前值与期待值,如果相同则更新,不同则直接返回
while (cur_as_bytes[offset] == compare_value) {
// 调用汇编指令 cmpxchg 执行 CAS 操作,期望值为 cur,更新为 new_val
jint res = cmpxchg(new_val, dest_int, cur);
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
// 返回当前值
return cur_as_bytes[offset];
}
我们可以看到这边,调用了汇编指令。