8. CAS
- 原子类:Atomic
- 没有CAS之前:多线程环境不使用原子类保证线程安全i++(基本数据类型),可以使用synchronized,但是很重
- 有CAS之后:
- 使用AtomicInteger.getAndIncrement这样的API,保证原子性同时不使用synchronized
- 类似我们的乐观锁
- CAS是什么:compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。
0. 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。- 它包含三个操作数—一内存位置、预期原值及更新值。
- 执行CAS操作的时候,将内存位置的值与预期原值比较:
- 如果相匹配,那么处理器会自动将该位置值更新为新值,
- 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。
- 匹配失败就会持续抢夺,就是自旋
- CAS原理:
- CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。
- 当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来
- 当它重来重试的这种行为成为–自旋!
- 简单讲,存一个旧值,比较内存值是否和旧值相等,相等则认为可以去修改,否则失败,重新取数进行操作
- 代码:
AtomicInteger atomicInteger = new AtomicInteger(5); System.out.println(atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(5, 308)+"\t"+atomicInteger.get()); // true 308 System.out.println(atomicInteger.compareAndSet(5, 3333)+"\t"+atomicInteger.get()); // false 308
- 底层原理:
- CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。
- 它是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。
- CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。
- 执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的,比起用synchronized重量级锁,这里的排他时间要短很多,所以在多线程情况下性能会比较好。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
- 本质:unsafe类
- UnSafe类:是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。
- 注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务
- 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
- 变量value用volatile修饰,保证了多线程之间的内存可见性。
- 为什么getAndIncrement可以保证原子性:
- CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇
编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。// AtomicInteger.java 文件 public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } // unsafe.class 文件 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } // 这个方法也在unsafe里面,但是是native的,代码没法直接看到,需要openJDK源码查看 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
- CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇
- 底层汇编:unsafe.cpp文件:
- 先想办法拿到变量value在内存中的地址,根据偏移量value Offset,计算 value 的地址
- 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是要交换的值 e是要比较的值
- cas成功,返回期望值e,等于e,此方法返回true
- cas失败,返回内存中的value值,不等于e,此方法返回false
- JDK提供的CAS机制,在汇编层级会禁止变量两侧的指令优化,然后使用cmpxchg指令比较并更新变量值(原子性)
- 在不同的系统下会调用不同的cmpxchg方法的重载函数
- 原子引用:自定义的原子类型,AtomicReference<T>:
class User { String userName; int age; } public static void main(String[] args){ User z3 = new User("z3",24); User li4 = new User("li4",26); AtomicReference<User> atomicReferenceUser = new AtomicReference<>(); atomicReferenceUser.set(z3); System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString()); System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString()); }
- 自旋锁
- CAS 是实现自旋锁的基础,CAS 利用 CPU 指令保证了操作的原子性, 以达到锁的效果,至于自旋呢,看字面意思也很明白,自己旋转。是指尝试获取锁的线程不会立即阻寨,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU
- 好处:避免wait类似的阻塞
public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void MyLock() { System.out.println(Thread.currentThread().getName()+"\t"+"---come in"); while(!atomicReference.compareAndSet(null,Thread.currentThread())) { } System.out.println(Thread.currentThread().getName()+"\t"+"---持有锁成功"); } public void MyUnLock() { atomicReference.compareAndSet(Thread.currentThread(),null); System.out.println(Thread.currentThread().getName()+"\t"+"---释放锁成功"); } public static void main(String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); new Thread(() -> { spinLockDemo.MyLock(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } spinLockDemo.MyUnLock(); },"t1").start(); new Thread(() -> { spinLockDemo.MyLock(); spinLockDemo.MyUnLock(); },"t2").start(); } }
- CAS两大问题:
- 如果CAS一直不成功,会给CPU带来巨大开销,循环时间长
- ABA问题:由于只检查结果,所以可能不是之前的结果了,但是依然认为可以修改。尽管CAS操作是成功的,但是在这个过程中是不安全的。
- 版本号:戳记流水 —— AtomicStampedReference
class Book{ private int id; private String bookName; } public static void main(String[] args) { Book javabook = new Book(1,"javaBook"); AtomicStampedReference<Book> stampedReference = new AtomicStampedReference<>(javabook,1); System.out.println(stampedReference.getReference()+"\t"+stampedReference.getStamp()); // 修改: Book mysqp = new Book(2,"mysql"); boolean b = stampedReference.compareAndSet(javabook, mysqp, stampedReference.getStamp(), stampedReference.getStamp() + 1); System.out.println(b+"\t"+stampedReference.getReference()+"\t"+stampedReference.getStamp()); }
- ABA问题实战:
// ABA问题复现:不使用AtomicStampedReference new Thread(() -> { atomicInteger.compareAndSet(100,101); atomicInteger.compareAndSet(101,100); },"t1").start(); //暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { boolean b = atomicInteger.compareAndSet(100, 20210308); System.out.println(Thread.currentThread().getName()+"\t"+"修改成功否:"+b+"\t"+atomicInteger.get()); },"t2").start(); // 使用版本控制 new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t"+"---默认版本号: "+stamp); //让后面的t4获得和t3一样的版本号,都是1,好比较 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(100,101,stamp,stamp+1); System.out.println(Thread.currentThread().getName()+"\t"+"---1次版本号: "+atomicStampedReference.getStamp()); atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName()+"\t"+"---2次版本号: "+atomicStampedReference.getStamp()); },"t3").start(); new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t"+"---默认版本号: "+stamp); //上前面的t3完成ABA问题 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } boolean result = atomicStampedReference.compareAndSet(100, 20210308, stamp, stamp + 1); System.out.println(Thread.currentThread().getName()+"\t"+"---操作成功否:"+result+"\t"+atomicStampedReference.getStamp()+"\t"+atomicStampedReference.getReference()); },"t4").start();
9. 原子类
是什么:atomic
- AtomicBoolean
- Atomiclnteger
- AtomicIntegerArray
- AtomiclntegerFieldUpdater
- AtomicLong
- AtomicLongArray
- AtomicLongFieldUpdater
- AtomicMarkableReference
- AtomicReference
- AtomicReferenceArray
- AtomicReferenceFieldUpdater
- AtomicStampedReference
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
9.1 原子类基础类型
- 基本类型原子类
- 有哪些:
- AtomicBoolean
- Atomiclnteger
- AtomicLong
- 常用API:
- public final int get() //获取当前的值
- public final int getAndSet(int newValuey/获取当前的值,并设置新的值
- public final int getAndlncrement(y/获取当前的值,并自增
- public final int getAndDecrement() //获取当前的值,并自减
- public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
- boolean compareAndSet(int expect, int update) 如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
- countDownLatch的使用:
- 场景:50个线程,对atomicInteger类型的资源进行++操作,执行1000次,要在执行完成的时候立刻输出结果:
-
public static final int SIEZ_ = 50; public static void main(String[] args) throws InterruptedException{ MyNumber myNumber = new MyNumber(); CountDownLatch countDownLatch = new CountDownLatch(SIEZ_); for (int i = 1; i <=SIEZ_; i++) { new Thread(() -> { try { for (int j = 1 ;j <=1000; j++) { myNumber.addPlusPlus(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } //try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t"+"---result : "+myNumber.atomicInteger.get()); }
- 有哪些:
- 数组类型原子类
- 有哪些:
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
- 方法:
- 构造方法,有两类参数重载,一种传一个int 代表长度,另一种直接传一个int数组进去
- 有哪些:
- 引用类型原子类
- AtomicReference:原子引用:(经典应用SpinLockDemo:自旋锁)
- AtomicStampedReference:戳记流水原子引用,解决ABA问题
- 解决修改过几次
- 状态戳原子引用:
- 常用方法:
- int getStamp():获取版本号
- boolean compareAndSet 对比值和版本号,更新值和版本号
- AtomicMarkableReference:标识标记原子引用
- 解决是否修改过
- 状态戳(true/false)原子引用
- 常用方法:
- boolean isMarked():获取是否被修改过
- compareAndSet(期望原值,修改值,marked,!marked)
- getReference():获取值
-
static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100,false); public static void main(String[] args) { new Thread(() -> { boolean marked = atomicMarkableReference.isMarked(); System.out.println(Thread.currentThread().getName()+"\t"+"---默认修改标识:"+marked); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } atomicMarkableReference.compareAndSet(100,101,marked,!marked); },"t1").start(); new Thread(() -> { boolean marked = atomicMarkableReference.isMarked(); System.out.println(Thread.currentThread().getName()+"\t"+"---默认修改标识:"+marked); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } boolean b = atomicMarkableReference.compareAndSet(100, 20210308, marked, !marked); System.out.println(Thread.currentThread().getName()+"\t"+"---操作是否成功:"+b); System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.getReference()); System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.isMarked()); },"t2").start(); }
- 后两者对比:
- 几乎是兄弟俩,很相似
- Stamped,version版本号,每次+1;Marked将状态简化为true/false,有无修改过
- 对象的属性修改原子类
- 有哪些:
- AtomiclntegerFieldUpdater:基于反射的实用程序,可对指定类的指定 volatile int字段进行原子更新。
- AtomicLongFieldUpdater:原子更新对象中Long类型字段的值
- AtomicReferenceFieldUpdater:原子更新引用类型字段的值
- 使用目的:
- 以一种线程安全的方式操作非线程安全对象内的某些字段
- 更加细粒度的原子更新
- 使用要求:
- 更新的对象属性必须使用 public volatile 修饰符。
- 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
- 相比synchronized,对Field的更新更像局部微创小手术
- 面试官问:哪里用volatile:
- 单例模式的懒汉式写法
- 对象属性修改的原子类:AtomicReferenceFieldUpdater
- 代码实操:字段级别的原子更新案例:
- 案例一:Integer类型属性更新
class BankAccount { String bankName = "ccb"; //以一种线程安全的方式操作非线程安全对象内的某些字段 //1 更新的对象属性必须使用 public volatile 修饰符。 public volatile int money = 0; //2 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须 // 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。 AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money"); public void transfer(BankAccount bankAccount) { fieldUpdater.incrementAndGet(bankAccount); } } public class AtomicIntegerFieldUpdaterDemo{ public static void main(String[] args) throws InterruptedException{ BankAccount bankAccount = new BankAccount(); CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <=10; i++) { new Thread(() -> { try { for (int j=0;j<1000;j++){ bankAccount.transfer(bankAccount); } } finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } // 等待运行完成 countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t"+"---bankAccount: "+bankAccount.money); } }
- 案例二:引用类型案例。初始化一个引用属性,只初始一次
class MyVar { public volatile Boolean isInit = Boolean.FALSE; AtomicReferenceFieldUpdater<MyVar,Boolean> referenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class,"isInit"); public void init(MyVar myVar) { if (referenceFieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE)) { System.out.println(Thread.currentThread().getName()+"\t"+"---start init"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t"+"---end init"); }else { System.out.println(Thread.currentThread().getName()+"\t"+"---抢夺失败,已经有线程在修改中"); } } } /** * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次 */ public class AtomicReferenceFieldUpdaterDemo{ public static void main(String[] args){ MyVar myVar = new MyVar(); for (int i = 1; i <=5; i++) { new Thread(() -> { myVar.init(myVar); },String.valueOf(i)).start(); } } }
- 案例一:Integer类型属性更新
- 有哪些:
9.2 原子操作增强类原理深度解析
- java才有的这些
- 有哪些:
- DoubleAccumulator:一个或多个变量共同维护使用提供的函数更新的运行 double值。
- DoubleAdder:一个或多个变量共同维持最初的零和 double总和。
- LongAccumulator:一个或多个变量共同维护使用提供的函数更新的运行 Long值。
- LongAccumulator提供了自定义的函数操作
- LongAdder:一个或多 个变量共同维持最初为零的总和为 Long
- LongAdder只能用来计算加法,且从零开始计算
- 阿里面试题:推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数)
- 热点商品点赞计算器,点赞数加加统计,不要求实时精确
- 一个很大的List,里面都是int 类型,如何实现加加,说说思路
- 使用场景:
- 当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公共和时,此类通常优于AtomicLong。在低更新争用下,这两个类具有相似的特征。但在高争用的情况下,这一类的预期吞吐量明显更高,但代价是空间消耗更高。
- 不要求精准实时的高并发大数据统计非常合适。
- LongAdder的常用API:
- void add(long x):将当前的value加x。
- void increment( ):将当前的value加1。
- void decrement( ):将当前的value减1。
- Iong sum( ): 返回当前值。特别注意,在没有并发更新value的情况下,sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值。
- void reset( ): 将value重置为0,可用于普代重新new一个LongAdder,但此方法只可以在没有并发更新的
情况下使用。 - long sumThenReset( ):获取当前value,井将value重置为0。
- 代码演示,一起演示long的两个原子类
- API演示
- LongAdder只能用来计算加法,且从零开始计算
- LongAccumulator提供了自定义的函数操作
LongAdder longAdder = new LongAdder();//只能做加法 longAdder.increment(); longAdder.increment(); longAdder.increment(); // longValue() 等价于sum操作 System.out.println(longAdder.longValue()); // 3 LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left - right, 100); longAccumulator.accumulate(1);// 相当于100 - 1 longAccumulator.accumulate(2);// 相当于99 - 2 longAccumulator.accumulate(3);// 相当于97 - 3 System.out.println(longAccumulator.longValue()); // 94
- LongAdder高性能对比Code演示:50个线程,每个点赞100w次,总数统计出来
结果: ----costTime: 2131 毫秒 add_Synchronized 50000000 ----costTime: 881 毫秒 add_AtomicInteger 50000000 ----costTime: 1005 毫秒 add_AtomicLong 50000000 ----costTime: 121 毫秒 add_LongAdder 50000000 ----costTime: 168 毫秒 add_LongAccumulator 50000000 public class LongAdderCalcDemo { public static final int SIZE_THREAD = 50; public static final int _1W = 10000; public static void main(String[] args) throws InterruptedException { ClickNumber clickNumber = new ClickNumber(); long startTime; long endTime; CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD); CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD); //======================== startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.add_Synchronized(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch1.countDown(); } },String.valueOf(i)).start(); } countDownLatch1.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_Synchronized"+"\t"+clickNumber.number); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.add_AtomicInteger(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch2.countDown(); } },String.valueOf(i)).start(); } countDownLatch2.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicInteger"+"\t"+clickNumber.atomicInteger.get()); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.add_AtomicLong(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch3.countDown(); } },String.valueOf(i)).start(); } countDownLatch3.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicLong"+"\t"+clickNumber.atomicLong.get()); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.add_LongAdder(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch4.countDown(); } },String.valueOf(i)).start(); } countDownLatch4.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAdder"+"\t"+clickNumber.longAdder.longValue()); startTime = System.currentTimeMillis(); for (int i = 1; i <=SIZE_THREAD; i++) { new Thread(() -> { try { for (int j = 1; j <=100 * _1W; j++) { clickNumber.add_LongAccumulator(); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch5.countDown(); } },String.valueOf(i)).start(); } countDownLatch5.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAccumulator"+"\t"+clickNumber.longAccumulator.longValue()); } }
- API演示
9.3 源码、原理分析
- 18罗汉中的两个隐藏类:Number、Striped64。LongAdder是Striped64的子类
- LongAdder性能好和Striped设计有关
- Striped64:
- Striped64有几个比较重要的成员函数 ——最重要2个
- 有一个静态内部类:Cell,单元格类
- 四个变量:
- NCPU:运行主机的CPU核心数
- Transient volatile Cell[ ] :单元格数组
- Transient volatile long base :
- Transient volatile int cellsBusy :自旋锁
- Cell:单元格类
LongAdder为什么这么快:原理
- longAdder之前:
- Atomic & CAS : 有且只有一个线程可以访问资源,其余在外面自旋。
- 数量少的时候,运行空转,但是数量很多的时候,大量的空转导致性能急剧下降
- 解决思路:化整为零 -> 分散热点
- longAdder之后:
- 当并发量低的时候,也只有一个线程可以访问资源,其余进行自旋。cas的访问long base变量
- 当并发量高的时候,会新建cell单元格,都可以进行访问,相当于cas的分散窗口
- 为什么快:一句话,分散热点
- 统计求和时:result = base + sum(Cell[ ])
- 总结:
LongAdder的的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
V a l u e = B a s e + ∑ i = 0 n Cell [ i ] Value = Base + \sum_{i=0}^{n} \text{Cell}[i] Value=Base+i=0∑nCell[i]
并发量低时直接cas base ;并发量高时累加进各个单元格再求和
源码解读
- 小总结:
- LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base逃行操作,当出现竞争关系时则是采用化整为零分散热点的做法,用空间换时间用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果。
- LongAdder.increasement(): 三个组成部分:
- add(1L)
- longAccumulate
- sum
- add方法:
- 源码:
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
- 解读:
- as 表示 cells 引用,是striped64 中的cells 数组属性
- b表示获取的 base 值,&striped64中的base属性
- v表示期望值,是当前线程hash到的Cell中存储的值
- m表示 cells 数组的长度减1,hash时作为掩码使用
- a 表示当前线程命中(hash运行得到)的 cell单元格
- 代码解读:
- 第三行:判断cells是否为null,即是否进行了扩容,如果未扩容,由于是或操作,则进行下一步操作,直接对base进行cas操作,如果成功,则加上取反的结果,if失败,跳出了后面的代码。
- 还是第三行:当并发量高之后,casbase开始失败,取反之后if条件达成,触发了后续的扩容操作。
- 第四行:定义一个boolean变量表示是否无冲突,默认初始值为不冲突
- 第五行:if中进行连续的或运算判断是否需要执行 longAccumulate(x, null, uncontended); 代码,即进行Cell扩容
- 第一个判断:as 是否为空,为空则需要扩容
- 第二个判断:as不为空之后,长度是不是没开辟
- 第三个判断:当前线程所在的ceLL为空, 说明当前线程还沒有更新过ceLL,应初始化 -1cell
- 第四个判断:如果Cells表非空,且前线程映射的槽非空,CAS更新Cel的值,成功则返回,否则,uncontended设为false,调用longAccumulate。扩容逻辑为翻倍扩容,一直为2次幂
- add方法思路归纳:
- 最初无竞争时只更新base;
- 如果更新base失败后,首次新建一个Cell[ ]数组
- 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容
- 小总结:
- 判断之后也进行操作,用结果作为返回值,激活下一步的操作
- 源码:
- longAccumulate 方法:
- 参数:
1. long X 需要增加的值,一般默认都是1
2. LongBinaryOperator fn 默认传递的是null
3. wasUncontended竞争标识,如果是false则代表有竞争。只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false - 前置知识:
- Striped64中一些变量或者方法的定义:
- base:类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
- collide:表示扩容意向,false一定不会扩容,true可能会扩容。
- cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态 1:表示其他线程已经持有了锁
- casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
- NCPU:当前计算机CPU数量,Cell数组扩容时会使用到
- getProbe():获取当前线程的hash值,底层调用unsafe获取一个随机的线程工号
- advanceProbe():重置当前线程的hash值
- 源码:整体结构分为3大块
- 先分配线程号
- for(;;)大自旋
1. cells已经被初始化
2. cells没有加锁且没有初始化,则尝试对他进行加锁,并初始化cells数组
3. 兜底,cells正在进行初始化,则尝试直接在base上进行累加;
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // 扩容意愿默认为false for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells已经被初始化,并且长度不为0 if ((a = as[(n - 1) & h]) == null) { // 对应位置还没赋值 if (cellsBusy == 0) { // 双端获取锁的第一次 Cell r = new Cell(x); // 新建一个cell单元对象 if (cellsBusy == 0 && casCellsBusy()) { // 双端获取锁第二次,同时对cellbusy加锁 boolean created = false; // 创建默认值为false try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && // 检查cell不为空 (m = rs.length) > 0 && // 检查cell长度大于0 ,以上两步也属于双端检查 rs[j = (m - 1) & h] == null) { // 检查对应坑位是否为空,也是双端检查 rs[j] = r; // 将刚才新建的cell对象赋值到坑位j,也就是h对m-1与运算 created = true; // 修改create值为true } } finally { cellsBusy = 0; // 使用finally释放锁,释放锁没有使用cas操作 } if (created) // 如果创建成功则退出循环,否则continue在此尝试创建 break; continue; // Slot is now non-empty }// if (cellsBusy == 0 && casCellsBusy()) } // if (cellsBusy == 0) 如果获取锁失败,则会直接执行下面的操作 collide = false; // 扩容已经成功或者获取锁失败,扩容意愿赋值false } else if (!wasUncontended) // CAS already known to fail // wasUncontended就是add中的是否冲突的值,这里用于判断,如果不冲突为false,也就是说冲突,则就会触发if条件判断 wasUncontended = true; // 这里没懂 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果直接cas修改,fn是运算符,为空是默认加法运算,如果运算成功,则跳出循环 else if (n >= NCPU || cells != as) // n是cells长度,如果n大于等于cpu核心数,或cells不等于as,扩容意愿就设置为false; collide = false; // At max size or stale else if (!collide) // 这里判断如果扩容意愿为false,则修改为true collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // 如果可以获取锁,那么就加锁 try { if (cells == as) { // 双端检查 Cell[] rs = new Cell[n << 1]; // n是cells的长度,直接扩容为2倍长度 for (int i = 0; i < n; ++i) // 把原值拷贝进来 rs[i] = as[i]; // 拷贝值 cells = rs; // 引用传递 } } finally { cellsBusy = 0; // finally释放锁 } collide = false; continue; // Retry with expanded table }// else if串结束 h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells没有加锁且没有初始化,则尝试对他进行加锁,并初始化cells数组 boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : //兜底,cells正在进行初始化,则尝试直接在base上进行累加; fn.applyAsLong(v, x)))) break; // Fall back on using base } }
- 参数:
- 已经有数组的情况下:
- 多个线程同时命中一个cell的竞争
- 初始化建设数组的情况:
扩容的之后直接新建两个cellelse if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; // 新建长度为2的数组 rs[h & 1] = new Cell(x); // 对新建数组的对应的坑位赋值 cells = rs; // 将rs赋值给cells init = true; // 初始化成功标记 } } finally { cellsBusy = 0; // 把锁置为 } }
对cell == as 进行双端检查,高并发思想。 - 源码阅读总结:
- if中一边判断一边干活,利用或运算的特性(如果得到一个true则跳过后面的判断),尽可能避免了层层if嵌套,其中运算的步骤是否成功也返回if进行判断,如果此步骤运算不成功,则执行下面的运算链路。
- 大量使用双端检查,对并发修改的资源变量和锁对象,都使用双端检查,即获取到锁之后再进行一遍检查,避免第一次判断和加锁之间,临界资源发生变化。保证高并发情况下的安全
- 使用类似计算机网络计算子网络地址的操作,用值和长度-1去与运算,得出hash值
- 加锁都是在finally中去释放锁,避免执行异常导致锁无法释放