CAS&Atomic 原子操作详解
什么是原子操作?如何实现原子操作?
什么是原子性?相信很多同学在工作中经常使用事务,事务的一大特性就是原子性(事务具有 ACID 四大特性),一个事务包含多个操作,这些操作要么全部执行,要么全都不执行。
并发里的原子性和原子操作是一样的内涵和概念,假定有两个操作 A 和 B 都包含多个步骤,如果从执行 A 的线程来看,当另一个线程执行 B 时,要么将 B 全部执行完,要么完全不执行 B,执行 B 的线程看 A 的操作也是一样的,那么 A 和 B 对彼此来说是原子的。
实现原子操作可以使用锁,锁机制,满足基本的需求是没有问题的了,但是有的时候我们的需求并非这么简单,我们需要更有效,更加灵活的机制,synchronized 关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程需要等待,直到该线程释放锁。
这里会有些问题:首先,如果被阻塞的线程优先级很高很重要怎么办?其次, 如果获得锁的线程一直不释放锁怎么办?同时,还有可能出现一些例如死锁之类的情况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重。为了解决这个问题,Java 提供了 Atomic 系列的原子操作类。
这些原子操作类其实是使用当前的处理器基本都支持 CAS 的指令,比如 Intel 的汇编指令 cmpxchg,每个厂家所实现的具体算法并不一样,但是原理基本一样。每一个 CAS 操作过程都包含三个运算符:一个内存地址 V,一个期望的值 A 和一 个新值 B,操作的时候如果这个地址上存放的值等于这个期望的值 A,则将地址上的值赋为新值 B,否则不做任何操作。
CAS 的基本思路就是,如果这个地址上的值和期望的值相等,则给其赋予新值,否则不做任何事儿,但是要返回原值是多少。自然 CAS 操作执行完成时,在业务上不一定完成了,这个时候我们就会对 CAS 操作进行反复重试,于是就有了循环 CAS。很明显,循环 CAS 就是在一个循环里不断的做 cas 操作,直到成功为止。Java 中的 Atomic 系列的原子操作类的实现则是利用了循环 CAS 来实现。
CAS 实现原子操作的三大问题
ABA 问题
因为 CAS 需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。
ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加 1,那么 A→B→A 就会变成 1A→2B→3A。举个通俗点的例子,你倒了一杯水放桌子上,干了点别的事,然后同事把你水喝了又给你重新倒了一杯水,你回来看水还在,拿起来就喝,如果你不管水中间被人喝过,只关心水还在,这就是 ABA 问题。
如果你是一个讲卫生讲文明的小伙子,不但关心水在不在,还要在你离开的时候水被人动过没有,因为你是程序员,所以就想起了放了张纸在旁边,写上初始值 0,别人喝水前麻烦先做个累加才能喝水。
循环时间长开销大。
自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。
只能保证一个共享变量的原子操作。
当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操 作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比 如,有两个共享变量 i = 2 , j=a ,合并一下 ij=2a ,然后用 CAS 来操作 ij 。从 Java 1.5开始,JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行 CAS 操作。
Jdk 中相关原子操作类的使用
AtomicInteger
•int addAndGet(int delta):以原子方式将输入的数值与实例中的值(AtomicInteger 里的 value)相加,并返回结果。
•boolean compareAndSet (int expect,int update ):如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。•int getAndIncrement() :以原子方式将当前值加 1 ,注意,这里返回的是自增前的值。•int getAndSet (int newValue ):以原子方式设置为 newValue 的值,并返回旧值。AtomicIntegerArray
主要是提供原子的方式更新数组里的整型,其常用方法如下。
•int addAndGet (int i,int delta ):以原子方式将输入值与数组中索引 i 的元素相加。•boolean compareAndSet (int i,int expect,int update ):如果当前值等于预期值,则以原子方式将数组位置 i 的元素设置成 update 值。需要注意的是,数组 value 通过构造方法传递进去,然后 AtomicIntegerArray会将当前数组复制一份,所以当 AtomicIntegerArray 对内部的数组元素进行修改时,不会影响传入的数组。更新引用类型
原子更新基本类型的 AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic 包提供了以下 3 个类。
AtomicReference
原子更新引用类型。
AtomicStampedReference
利用版本戳的形式记录了每次改变以后的版本号,这样的话就不会存在 ABA 问题了。这就是 AtomicStampedReference 的解决方案。AtomicMarkableReference 跟 AtomicStampedReference 差不多,AtomicStampedReference 是使用 pair 的 int stamp 作为计数器使用,AtomicMarkableReference 的 pair 使用的是 boolean mark。 还是那个水的例子,AtomicStampedReference 可能关心的是动过几次, AtomicMarkableReference 关心的是有没有被人动过,方法都比较简单。
AtomicMarkableReference
原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。构造方法是 AtomicMarkableReference(V initialRef,booleaninitialMark)。
LongAdder
LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回。
面试必问并发安全问题
线程安全性
什么是线程安全性?我们可以这么理解,我们所写的代码在并发情况下使用时,总是能表现出正确的行为;反之,未实现线程安全的代码,表现的行为是不可预知的,有可能正确,而绝大多数的情况下是错误的。
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在调用代码中不需要任何额外的同步或者协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
如何实现
1.线程封闭
实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发。避免并发最简单的方法就是线程封闭。什么是线程封闭呢?
就是把对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。
栈封闭
栈封闭是我们编程当中遇到的最多的线程封闭。什么是栈封闭呢?简单的说就是局部变量。多个线程访问一个方法,此方法中的局部变量都会被拷贝一份到线程栈中。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。所以能用局部变量就别用全局的变量,全局变量容易引起并发问题。
TheadLocal
ThreadLocal 是实现线程封闭的最好方法。ThreadLocal 内部维护了一个 Map,Map 的 key 是每个线程的名称,而 Map 的值就是我们要封闭的对象。每个线程中的对象都对应着 Map 中一个值,也就是 ThreadLocal 利用 Map 实现了对象的线程封闭。
2.无状态的类
没有任何成员变量的类,就叫无状态的类,这种类一定是线程安全的。
3.让类不可变
让状态不可变,加 final 关键字,对于一个类,所有的成员变量应该是私有的,同样的只要有可能,所有的成员变量应该加上 final 关键字,但是加上 final,要注意如果成员变量又是一个对象时,这个对象所对应的类也要是不可变,才能保证整个类是不可变的。
但是要注意,一旦类的成员变量中有对象,上述的 final 关键字保证不可变并不能保证类的安全性,为何?因为在多线程下,虽然对象的引用不可变,但是对象在堆上的实例是有可能被多个线程同时修改的,没有正确处理的情况下,对象实例在堆中的数据是不可预知的。
4.加锁和 CAS
我们最常使用的保证线程安全的手段,使用 synchronized 关键字,使用显式锁,使用各种原子变量,修改数据时使用 CAS 机制等等。
死锁
是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。
所以总结一下:
1、死锁是必然发生在多操作者(M>=2 个)争夺多个资源(N>=2 个,且 N<=M)才会发生这种情况。很明显,单线程自然不会有死锁,只有 B 一个去,不要 2 个,打十个都没问题;单资源呢?只有 13,A 和 B 也只会产生激烈竞争,打得不可开交,谁抢到就是谁的,但不会产生死锁。2、争夺资源的顺序不对,如果争夺资源的顺序是一样的,也不会产生死锁;3、争夺者对拿到的资源不放手。避免死锁常见的算法有有序资源分配法、银行家算法。
package cn.tulingxueyuan.safe.dl;
/**
*@author
*
*类说明:演示死锁的产生
*/
public class NormalDeadLock {
private static Object No13 = new Object();//第一个锁
private static Object No14 = new Object();//第二个锁
//第一个拿锁的方法
private static void zhouYuDo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (No13){
System.out.println(threadName + " get No13");
Thread.sleep(100);
synchronized (No14){
System.out.println(threadName + " get No14");
}
}
}
//第二个拿锁的方法
private static void monkeyDo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (No13){
System.out.println(threadName + " get No13");
Thread.sleep(100);
synchronized (No14){
System.out.println(threadName + " get No14");
}
}
}
//子线程,代表周瑜老师
private static class ZhouYu extends Thread{
private String name;
public ZhouYu(String name) {
this.name = name;
}
@Override
public void run() {
Thread.currentThread().setName(name);
try {
zhouYuDo();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
//主线程,代表Monkey老师
Thread.currentThread().setName("Monkey");
ZhouYu zhouYu = new ZhouYu("ZhouYu");
//System.out.println(ManagementFactory.getRuntimeMXBean().getName());
zhouYu.start();
monkeyDo();
}
}
如何解决
定位
要解决死锁,当然要先找到死锁,怎么找?
通过 jps 查询应用的 id,再通过 jstack id 查看应用的锁的持有情况
两种解决方式1、内部通过顺序比较,确定拿锁的顺序;2、采用尝试拿锁的机制。以下代码采用 显式锁实现
package cn.tulingxueyuan.safe.dl;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
*@author Mark老师
*类说明:演示普通账户的死锁和解决
*/
public class TryLock {
private static Lock No13 = new ReentrantLock();//第一个锁
private static Lock No14 = new ReentrantLock();//第二个锁
//先尝试拿No13 锁,再尝试拿No14锁,No14锁没拿到,连同No13 锁一起释放掉
private static void zhouYuDo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random r = new Random();
while(true){
if(No13.tryLock()){
System.out.println(threadName +" get 13");
try{
if(No14.tryLock()){
try{
System.out.println(threadName +" get 14");
System.out.println("zhouYuDo do work------------");
break;
}finally{
No14.unlock();
}
}
}finally {
No13.unlock();
}
}
//Thread.sleep(r.nextInt(3));
}
}
//先尝试拿No14锁,再尝试拿No13锁,No13锁没拿到,连同No14锁一起释放掉
private static void monkeyDo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random r = new Random();
while(true){
if(No14.tryLock()){
System.out.println(threadName +" get 14");
try{
if(No13.tryLock()){
try{
System.out.println(threadName +" get 13");
System.out.println("monkeyDo do work------------");
break;
}finally{
No13.unlock();
}
}
}finally {
No14.unlock();
}
}
//Thread.sleep(r.nextInt(3));
}
}
private static class ZhouYu extends Thread{
private String name;
public ZhouYu(String name) {
this.name = name;
}
public void run(){
Thread.currentThread().setName(name);
try {
zhouYuDo();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread.currentThread().setName("Monkey");
ZhouYu zhouYu = new ZhouYu("ZhouYu");
zhouYu.start();
try {
monkeyDo();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
其他安全问题
活锁
两个线程在尝试拿锁的机制中,发生多个线程之间互相谦让,不断发生同一个线程总是拿到同一把锁,在尝试拿另一把锁时因为拿不到,而将本来已经持有的锁释放的过程。
解决办法:每个线程休眠随机数,错开拿锁的时间。
线程饥饿
低优先级的线程,总是拿不到执行时间