CAS
原子类 java.util.concurrent.atomic
是什么
CAS
compare and swap的缩写,中文翻译比较并交换,实现并发算法时常用的一种技术
它包含三个操作数–内存位置、预期原值及更新值
执行CAS操作时,将内存位置的值与预期原值比较
如果相匹配,那么处理器会自动将该位置的值更新为新值;
如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功
硬件级别保证
CAS是JDK提供的非阻塞原子性操作,他通过硬件保证了比较-更新的原子性
他是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠
CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg
执行cmpxch指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的,比起synchronized重量级锁,这里的排他时间短很多,所以在多线程情况下性能会比较好
i++线程不安全,那atomicInteger.getAndIncrement()
AtomicInteger类主要利用CAS(compare and swap)+ volatile 和 native 方法来保证原子操作,从而避免synchronized的高开销,执行效率大为提升
CAS并发原语体现在JAVA语言中的就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统原语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题
CAS 总结
只需要记住:CAS是靠硬件实现的从而在硬件层提升效率,底层还是交给硬件来保证原子性和可见性
实现方式是基于硬件平台的汇编指令,在intel的CPU中(X86机器上,使用的是汇编指令cmpxchg)指令
核心思想是:比较要更新变量的值V和预期值E(compare),相等才会将V的值设为新值N(swap),如果不相等自旋再来
CAS与自旋锁,借鉴CAS思想
是什么
CAS是实现自旋锁的基础,CAS利用CPU指令保证了操作的原子性,已达到锁的效果,至于自旋,字面意思,自己旋转。是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用是,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗cpu
OpenJDK源码里面查看下Unsafe.java
CAS是实现自旋锁的基础,自旋锁翻译成人话就是循环,一般是用一个无限循环实现。这样一来,一个无限循环中,执行一个CAS操作
当操作成功返回true时,循环结束
当返回false时,接着执行循环,继续尝试CAS操作,直到返回true
自旋锁
/**
* 题目:实现一个自旋锁,
* 自旋锁好处:循环比较获取没有类似wait的阻塞
*
* 通过CAS操作完成自旋锁,A线程先进来调用lock方法持有锁5秒钟,B随后进来发现当前有线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到
*/
public class SpinLock {
private AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "----- come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void unLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t" + "----- task over ,unlock");
}
public static void main(String[] args) {
SpinLock spinLock = new SpinLock();
new Thread(() -> {
spinLock.lock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
spinLock.unLock();
},"a").start();
// 保证a 先启动
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
spinLock.lock();
spinLock.unLock();
},"b").start();
}
}
CAS 缺点
- 循环时间长开销大
- ABA问题 解决 -》版本号 使用AtomicSatmpedReference
ABA问题解决演示
@Getter
@Data
@AllArgsConstructor
class User{
String name;
int age;
}
public class Thread3 {
public static void main(String[] args) {
User u3 = new User("张三", 23);
AtomicStampedReference<User> stampedReference = new AtomicStampedReference<>(u3, 1);
System.out.println(stampedReference.getReference() + " \t " + stampedReference.getStamp());
User u4 = new User("李四", 23);
stampedReference.compareAndSet(u3,u4,stampedReference.getStamp(),stampedReference.getStamp()+1);
System.out.println(stampedReference.getReference() + " \t " + stampedReference.getStamp());
stampedReference.compareAndSet(u4,u3,stampedReference.getStamp(),stampedReference.getStamp()+1);
System.out.println(stampedReference.getReference() + " \t " + stampedReference.getStamp());
}
}
原子操作类之18罗汉增强
基本类型原子类
AtomicInteger
AtomicBoolean
AtomicLong
class AtomicDemo {
public AtomicInteger atomicInteger = new AtomicInteger();
public void add() {
atomicInteger.getAndAdd(1);
}
}
public class Thread3 {
public static Integer SIZE = 50;
public static void main(String[] args) throws InterruptedException {
AtomicDemo ad = new AtomicDemo();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (Integer i = 0; i < SIZE; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
ad.add();
}
}finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
System.out.println(ad.atomicInteger.get());
}
}
数组类型原子类
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(3);
AtomicIntegerArray atomicIntegerArray1 = new AtomicIntegerArray(new int[]{1, 2, 3, 4});
引用类型原子类
AtomicReference
AtomcStampedReference(多次)
AtomicMarkableReference(一次性)
对象的属性修改原子类
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
使用
更新的对象属性必须使用public volatile修饰符
因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置要更新的类和属性
面试:哪里用到了volatile?单例模式,AtomicReferenceFieldUpdater
class BankAccount{
public volatile int money = 0;
AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");
public void transMoney(BankAccount bankAccount)
{
fieldUpdater.getAndIncrement(bankAccount);
}
}
public class Thread3 {
public static Integer SIZE = 50;
public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (Integer i = 0; i < SIZE; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
bankAccount.transMoney(bankAccount);
}
}finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
System.out.println(bankAccount.money);
}
}
/**
* 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能被初始化一次,只有一个线程操作成功
*/
class MyVar{
public volatile Boolean isInit = Boolean.FALSE;
AtomicReferenceFieldUpdater<MyVar, Boolean> referenceFieldUpdater =
AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit");
public void init(MyVar myVar)
{
if (referenceFieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName()+ " start init,need 2 seconds");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+ " init over");
}else{
System.out.println(Thread.currentThread().getName()+ " 已被初始化");
}
}
}
public class Thread3 {
public static void main(String[] args) {
MyVar myVar = new MyVar();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
myVar.init(myVar);
},""+i).start();
}
}
}
原子操作增强类原理深度解析
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAddr
LongAddr累加操作
LongAccumulator 自定义连续操作
LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left + right, 0);
longAccumulator.accumulate(1);
System.out.println(longAccumulator.get());
longAccumulator.accumulate(3);
System.out.println(longAccumulator.get());
性能测试
class ClickNumber {
int number = 0;
public synchronized void clickBySync()
{
number++;
}
AtomicLong atomicLong = new AtomicLong(0);
public void clickByAtomicLong(){
atomicLong.getAndIncrement();
}
LongAdder longAdder = new LongAdder();
public void clickLongAdder()
{
longAdder.increment();
}
LongAccumulator longAccumulator = new LongAccumulator((LongBinaryOperator) (left, right) -> left+right,0);
public void clickLongAcc()
{
longAccumulator.accumulate(1);
}
}
/**
* 50个线程,每个线程100w次,总点赞数出来
*/
public class Thread3 {
public static final int _1W = 10000;
public static final int threadNumber = 500;
public static void main(String[] args) throws InterruptedException {
ClickNumber clickNumber = new ClickNumber();
long startTime;
long endTime;
CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber);
CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber);
startTime = System.currentTimeMillis();
for (int i = 0; i < threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 0; j < _1W; j++) {
clickNumber.clickBySync();
}
}finally {
countDownLatch1.countDown();
}
}).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("sync "+(endTime-startTime));
startTime = System.currentTimeMillis();
for (int i = 0; i < threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 0; j < _1W; j++) {
clickNumber.clickByAtomicLong();
}
}finally {
countDownLatch2.countDown();
}
}).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("ato "+(endTime-startTime));
startTime = System.currentTimeMillis();
for (int i = 0; i < threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 0; j < _1W; j++) {
clickNumber.clickLongAdder();
}
}finally {
countDownLatch3.countDown();
}
}).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("addr "+(endTime-startTime));
startTime = System.currentTimeMillis();
for (int i = 0; i < threadNumber; i++) {
new Thread(() -> {
try {
for (int j = 0; j < _1W; j++) {
clickNumber.clickLongAcc();
}
}finally {
countDownLatch4.countDown();
}
}).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("acc "+(endTime-startTime));
}
}
LongAdder 为什这么快?
base变量:低并发,直接累加到该变量上
Cell[]:高并发,累加进各个线程自己的槽Cell[i]中
总结
AtomicLong
原理
CAS+自旋,incrementAndGet
场景
低并发下的全局计算
AtomicLong能保证并发情况下计数的准确性,通过CAS来解决并发问题
缺陷
高并发后性能急剧下降
why? AtomicLong 自旋锁会成为瓶颈
LongAdder
原理
CAS+Base+Cell数组分散
空间换时间并分撒了热点数据
场景
高并发下全局计算
缺陷
计算结果不是实时性的,可以保证最终一致性