原子操作类之18罗汉增强

news2024/11/18 10:11:02

原子操作类之18罗汉增强

是什么

都是java.util.concurrent.atomic包下的

有红框圈起来的,也有蓝框圈起来的,为什么?

image-20221203201045308

  • 阿里巴巴Java开发手册

image-20221203201123663

  • 为什么说18罗汉增强,却只有16个

image-20221203201202994

再分类

基本类型原子类

AtomicInteger
AtomicBoolean
AtomicLong

常用API

public final int get()
public final int getAndSet(int new Value)
public final int getAndIncrement()
public final int getAndDecrement()
public final int getAndAdd(int delta)
public comapreAndSet(int expect,int update)//如果

Case-CountDownLatch

  • 案例
class MyNumber {
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus() {
        atomicInteger.getAndIncrement();
    }
}

public class AtomicIntegerDemo {
    public static final int SIZE = 50;

    public static void main(String[] args) {
        MyNumber myNumber = new MyNumber();
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                for (int j = 1; j <= 1000; j++) {
                    myNumber.addPlusPlus();
                }
            }, String.valueOf(i)).start();
        }
        //前面的计算还没计算玩,main线程就去取值了导致取不到最终计算结果
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger);
    }
}
//本来应该是50000
//1试-main  result: 39000
//2试-main  result: 40178
//?是不是我们的程序有问题?
class MyNumber {
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus() {
        atomicInteger.getAndIncrement();
    }
}
//方法一(不推荐,做做Demo还行)
public class AtomicIntegerDemo {
    public static final int SIZE = 50;
    public static void main(String[] args) {
        MyNumber myNumber = new MyNumber();
        for(int i = 1;i <= SIZE;i ++){
            new Thread(() -> {
                for(int j = 1;j <= 1000;j ++){
                    myNumber.addPlusPlus();
                }
            },String.valueOf(i)).start();
        }
        //让线程等待2秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //前面让线程等待两秒,如果两秒之内计算完毕,则main线程取到的计算结果是正确的,
        // 但如果2秒之后还是没有计算完毕,则可能取不到正确的计算结果
        System.out.println(Thread.currentThread().getName()+"\t"+"result: "+myNumber.atomicInteger);
    }
}
class MyNumber {
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus() {
        atomicInteger.getAndIncrement();
    }
}

//方法二-减法计数器CountDownLatch
public class AtomicIntegerDemo {
    public static final int SIZE = 50;

    public static void main(String[] args) throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        //减法计数器:创建时定义线程个数,没当线程允许结束一个就减去一个
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }
        //如果当前线程计数大于0,则阻塞等待;反之则放行
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger);
    }
}
// main    result: 50000

数组类型原子类

基本原理同上

AtomicIntegerArray
AtomicLongArray
AtomicRreferenceArray
  • 案例

    public class AtomicIntegerArrayDemo {
        public static void main(String[] args) {
            //创建了一个int数组,每个元素默认值为0
            AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);//0 0 0 0 0
            //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
            //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});//1 2 3 4 5
    
            //遍历获取每个元素的值
            for (int i = 0; i < atomicIntegerArray.length(); i++) {
                System.out.println(atomicIntegerArray.get(i));
            }
            System.out.println();
            System.out.println();
            System.out.println();
            int tmpInt = 0;
            //获取下标为0的元素的值,并设置为1122
            tmpInt = atomicIntegerArray.getAndSet(0, 1122);
            System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0)); // 0 1122
            //获取下标为1的元素的值,并让其自增1
            atomicIntegerArray.getAndIncrement(1);
            atomicIntegerArray.getAndIncrement(1);
            tmpInt = atomicIntegerArray.getAndIncrement(1); // 2
            System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));//2 3
        }
    }
    

引用类型原子类

AtomicReference
AtomicStampedReference
AtomicMarkableReference

AtomicReference 可以带泛型``AtomicReference `

AtomicStampedReference 带版本号以防CAS中的ABA问题,携带版本号的引用类型原子类,可以解决ABA问题。解决修改过几次的问题。

AtomicMarkableReference类似于上面的 ,但解决一次性问题

构造方法AtomicMarkableReference(V initialRef, boolean initialMark)

原子更新带有标记位的引用类型对象

解决是否修改过,它的定义就是将状态戳简化true|false,类似一次性筷子

//来个案例
public class AtomicMarkableReferenceDemo {

    static AtomicMarkableReference markableReference = new AtomicMarkableReference(100, false);

    public static void main(String[] args) {
        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t" + "默认标识" + marked);
            //暂停1秒钟线程,等待后面的T2线程和我拿到一样的模式flag标识,都是false
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            markableReference.compareAndSet(100, 1000, marked, !marked);
        }, "t1").start();

        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t" + "默认标识" + marked);
            //这里停2秒,让t1先修改,然后t2试着修改
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean t2Result = markableReference.compareAndSet(100, 2000, marked, !marked);
            //t2取比较期望值的时候,已经被t1从100改为了1000,marked也被修为了true,此时t2修改失败
            System.out.println(Thread.currentThread().getName() + "\t" + "t2线程result--" + t2Result);
            System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked());
            System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference());

        }, "t2").start();
    }
}
//允许结果
//t1	默认标识false
//t2	默认标识false
//t2	t2线程result--false
//t2	true
//t2	1000

对象的属性修改原子类

关键词FieldUpdater

AtomicIntegerFieldUpdater//原子更新对象中int类型字段的值
AtomicLongFieldUpdater//原子更新对象中Long类型字段的值
AtomicReferenceFieldUpdater//原子更新引用类型字段的值

更加细粒度范围内的原子更新

image-20221211205320177

使用目的

  • 以一种线程安全带 方式操作非线程安全对象内的某些字段

举个例子(它是更加细粒度的/影像某个字段,而不用锁住整个对象)

image-20221211205523674

使用要求

  • 更新的对象属性必须使用public volatile修饰符

因为对象的属性修改类型原子类都是抽象类所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

Case

AtomicIntegerFieldUpdater-这个针对int类型

class BankAccount {
    String bankName = "CCB";
    public volatile int money = 0;//条件一

    //synchronized版本
//    public synchronized void add(){
//        money++;
//    }
    //AtomicIntegerFieldUpdater版本 
    //param1:第一个参数是字段所属类
    //param2:指定该类中线程安全的字段
    AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
            AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");//只限制了money这个字段,条件二

    public void transMoney(BankAccount bankAccount) {
        fieldUpdater.getAndIncrement(bankAccount);
    }

}

public class AtomicIntegerFieldUpdaterDemo {
    public static void main(String[] args) throws InterruptedException {
        BankAccount bankAccount = new BankAccount();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        // bankAccount.add();
                        bankAccount.transMoney(bankAccount);
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + bankAccount.money);
    }
}
//main  result: 10000

AtomicReferenceFieldUpdater-适用度更广

//比如这个案例中是针对boolean类型的
class MyVar {
    public volatile Boolean isInit = Boolean.FALSE;
    //param1 第一个参数是字段所属类
    //param2 类型
    //param3 字段名
    AtomicReferenceFieldUpdater<MyVar, Boolean> referenceFieldUpdater =
            AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");

    public void init(MyVar myVar) {
        //referenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE) 如果初始值为false,则将值设置为true
        if (referenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----start init,needs 3 seconds");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "\t" + "-----over init");
        } else {
            System.out.println(Thread.currentThread().getName() + "\t" + "抱歉,已经有其他线程进行了初始化");
        }
    }
}

public class AtomicReferenceFieldUpdaterDemo {
    public static void main(String[] args) {
        MyVar myVar = new MyVar();
        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                //线程第一次进入之后,将isInit的值修改为true
                //后续referenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)
                //均返回false
                myVar.init(myVar);
            }, String.valueOf(i)).start();
        }
    }
}
//1  -----start init,needs 3 seconds
//5  抱歉,已经有其他线程进行了初始化
//4  抱歉,已经有其他线程进行了初始化
//2  抱歉,已经有其他线程进行了初始化
//3  抱歉,已经有其他线程进行了初始化
//1  -----over init

面试

面试官问你:你在哪里用了volatile?

在AtomicReferenceFieldUpdater中,因为是规定好的必须由volatile修饰的

还有的话之前我们在DCL单例中,也用了volatile保证了可见性

原子操作增强类原理深度解析

开篇的时候我们将原子类分为了红框和蓝框,这里就是蓝框的内容

//这几个都是java8开始有的,前面的都是java5就有了
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder

面试题

  1. 热点商品点赞计算器,点赞数加加统计,不要求实时精确
  2. 一个很大的List,里面都是int类型,如何实现加加,说说思路

模拟下点赞计数器,看看性能

  • 要求:热点商品点赞计算器,点赞数加加统计,不要求实时精确

看看这个LongAdder

  • 看看这个LongAccumulator

image-20221211210747470

常用API

image-20221211210931126

快速入门

LongAdder只能用来计算加法 。且从零开始计算

LongAccumulator提供了自定义的函数操作 (利用lambda表达式)

public class LongAdderAPIDemo {
    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder();

        longAdder.increment();
        longAdder.increment();
        longAdder.increment();

        System.out.println(longAdder.longValue());//3

        LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);//lambda表达式
        longAccumulator.accumulate(1);//1
        longAccumulator.accumulate(3);//4
        System.out.println(longAccumulator.get());//4
    }
}

LongAdder高性能对比Code演示

//需求:50个线程,每个线程100w次,计算总点赞数
class ClickNumber {
    int number = 0;

    public synchronized void addBySynchronized() {
        number++;
    }

    AtomicLong atomicLong = new AtomicLong(0);

    public void addByAtomic() {
        atomicLong.incrementAndGet();
    }

    LongAdder longAdder = new LongAdder();

    public void addByLongAdder() {
        longAdder.increment();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

    public void addByLongAccumulator() {
        longAccumulator.accumulate(1);
    }
}

public class AccumulatorCompareDemo {
    public static final int _1W = 1000000;
    public static final int threadNumber = 50;

    public static void main(String[] args) throws InterruptedException {

        ClickNumber clickNumber = new ClickNumber();
        Long startTime;
        Long endTime;
        CountDownLatch countDownLatch1 = new CountDownLatch(50);
        CountDownLatch countDownLatch2 = new CountDownLatch(50);
        CountDownLatch countDownLatch3 = new CountDownLatch(50);
        CountDownLatch countDownLatch4 = new CountDownLatch(50);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.addBySynchronized();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime---" + (endTime - startTime) + "毫秒" + "\t" + "synchronized---" + clickNumber.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.addByAtomic();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime---" + (endTime - startTime) + "毫秒" + "\t" + "atomicLong---" + clickNumber.atomicLong);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.addByLongAdder();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime---" + (endTime - startTime) + "毫秒" + "\t" + "LongAdder---" + clickNumber.longAdder.sum());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= _1W; j++) {
                        clickNumber.addByLongAccumulator();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime---" + (endTime - startTime) + "毫秒" + "\t" + "LongAccumulator---" + clickNumber.longAccumulator.longValue());
    }
    //costTime---2205毫秒  synchronized---50000000
    //costTime---435毫秒  atomicLong---50000000
    //costTime---86毫秒  LongAdder---50000000
    //costTime---84毫秒  LongAccumulator---50000000
}//印证了阿里卡法手册中说的 【如果是JDK8,推荐使用LongAdder对象,比AtomicLong性能更好(减少乐观锁的重试次数)】

源码、原理分析

架构

image-20221211211726779

LongAdder是Striped64的子类

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;
 //---------------------------
 abstract class Striped64 extends Number {

原理(LongAdder为什么这么快)

官网说明和阿里要求
  • 阿里说明

image-20221211211913709

  • 官网说明

image-20221211212005306

LongAdder是Striped64的子类
Striped64
  • 重要的成员函数
//Number of CPUS, to place bound on table size       
// CPU数量,即cells数组的最大长度 
static final int NCPU = Runtime.getRuntime().availableProcessors();


//Table of cells. When non-null, size is a power of 2.
//单元格数组|cells数组,为2的幂,2,4,8,16.....,方便以后位运算
transient volatile Cell[] cells;

//基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
//Base value, used mainly when there is no contention, but also as
//a fallback during table initialization races. Updated via CAS.
transient volatile long base;

//创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
//Spinlock (locked via CAS) used when resizing and/or creating Cells. 
transient volatile int cellsBusy;

最重要的两个

image-20221211212057796

  • Striperd64中一些变量或者方法的定义

image-20221211212138769

Cell

是java.util.concurrent.atomic下Striped64的一个静态内部类

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

LongAdder为什么这么快

其实在小并发下情况差不多;但在高并发情况下,在AtomicLong中,等待的线程会不停的自旋,导致效率比较低;而LongAdder用cell[]分了几个块出来,最后统计总的结果值(base+所有的cell值),分散热点。

举个形象的例子,火车站买火车票,AtomicLong 只要一个窗口,其他人都在排队;而LongAdder 利用cell开了多个卖票窗口,所以效率高了很多。

image-20221211212325152

一句话

LongAdder的基本思路就是分散热点 ,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点 。

image-20221211212415600

  • 数学表达

内部有一个base变量,一个Cell[]数组。

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,累加各个线程自己的槽Cell[i]中

源码解读深度分析

小总结
LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组 \ \ ,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

image-20221211212555742

LongAdder.increment()

  • 1-add(1L)
public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;


	//Creates a new adder with initial sum of zero.
    public LongAdder() {
    }


	// Adds the given value.**     
	//  @param x the value to add*     
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    //真正干活的是longAccumulate
    //as表示cells引用
    //b表示获取的base值
    //v表示期望值
    //m表示cells数组的长度
    //a表示当前线程命中的cell单元格
  • uncontended代表没有冲突。

我们点进这个casBase发现他也是个CAS

final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

一开始竞争小的时候CAS能成功,也就是casBase能成功,然后cells也是空的,所以不会进到循环

image-20221211212907070

竞争大的时候,他会Cell[] rs = new Cell[2]; 新建两个cell, 此时≠ null ,条件满足了,进入循环。

然后这里还有一层循环,这里是多个if并排

image-20221211213018052

总结一下
1.最初无竞争时只更新base;
2.如果更新base失败后,首次新建一个Cell[]数组
3.当多个线程竞争同一个Cell比价激烈时,可能就要利用longAccumulate对Cell[]扩容。

image-20221211213049194

image-20221211213133776

2-longAccumulate

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
            //这里是③ Cell数组不再为空且可能存在Cell数组扩容
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)//不能超过cpu核数
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];//扩容-左移一位,相当于x2
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            //这里是①初始化
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
//------可以先看这里,进行了初始化,长度是2
//------cells数组,为2的幂,2,4,8,16,方便以后位运算
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))//这里是②兜底
                break;                          // Fall back on using base
        }
    }

  • LongAccumulate入参说明

image-20221211213245637

  • Striped64中一些变量或者方法的定义

image-20221211213338380

  • 步骤
  • 我们先讲这个(a = as[getProbe() & m])里的probe,这里其实拿了hash值,通过hash值知道我们去到哪个cell槽
static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

//其实就是得到了线程的Hash值

所以最前面的这一段就像是新员工入职获取工号(hash值)一样

image-20221211213443516

  • 总纲

image-20221211213459159

上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:
CASE1:Cell[]数组已经初始化
CASE2:Cell[]数组未初始化(首次新建)
CASE3:Cell[]数组正在初始化中

  • 计算

①刚刚要初始化Cell[]数组(首次新建)

image-20221211213538925

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,
rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。
h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思。

②兜底

多个线程尝试CAS修改失败的线程会走到这个分支

//排在最后面的
else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
//该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

③Cell数组不再为空且可能存在Cell数组扩容

多个线程同时命中一个cell的竞争,这个是最复杂的部分

(1)

image-20221211213708843

上面代码判断当前线程hash后指向的数据位置元素是否为空,
如果为空则将Cell数据放入数组中,跳出循环。
如果不空则继续循环。

(2)

image-20221211213802100

(3)

image-20221211213855508

说明当前线程对应的数组中有了数据,也重置过hash值,
这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。

(4)

image-20221211213951898

(5)

image-20221211214015908

(6)

image-20221211214117193

  • 以上六步总结

image-20221211214153724

3-sum

image-20221211214213199

//LongAdder.java
public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

sum()会将所有Cell数组中的value和base累加作为返回值。
核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点

为啥在并发情况下sum的值不精确?

sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。

首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了 ,而此时局部变量sum不会更新,造成不一致。
其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

使用总结

AtomicLong
线程安全,可允许一些性能损耗,要求高精度时可使用

保证精度,性能代价

AtomicLong是多个线程针对单个热点值value进行原子操作

LongAdder
当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用

保证性能,精度代价

LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作

小总结

AtomicLong

原理
CAS+自旋

incrementAndGet

场景
低并发下的全局计算

AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题

缺陷
高并发后性能急剧下降

why?AtomicLong的自旋会称为瓶颈(N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。)

LongAdder

原理
CAS+Base+Cell数组分散

空间换时间并分散了热点数据

场景
高并发的全局计算

缺陷
sum求和后还有计算线程修改结果的话,最后结果不够准确

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/80946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

wpa_supplicant工具移植到嵌入式设备

1、wpa_supplicant源码下载 (1)源码下载地址&#xff1a;http://w1.fi/releases/&#xff1b; (2)本文是以wpa_supplicant-2.6.tar.gz版本进行移植&#xff1b; 2、编译openssl 2.1、确定适配的openssl版本 Optional libraries for EAP-TLS, EAP-PEAP, and EAP-TTLS: - OpenS…

【LeetCode】1827. 最少操作使数组递增

题目描述 给你一个整数数组 nums &#xff08;下标从 0 开始&#xff09;。每一次操作中&#xff0c;你可以选择数组中一个元素&#xff0c;并将它增加 1 。 比方说&#xff0c;如果 nums [1,2,3] &#xff0c;你可以选择增加 nums[1] 得到 nums [1,3,3] 。 请你返回使 nums …

ESXi8.0中NVME硬盘不识别解决方法1,设置直通

目录 1.前言 2.直通设置 3.槽点 1.前言 ESXi8.0删除了很多老版本的硬件的驱动程序&#xff0c;导致NVME1.3及更低协议的固态硬盘均无法被ESXi直接识别正如我手头准备了尚好的服务器专用PM983A却无法识别。本着不折腾先熟悉ESXi8.0的思路另外找了一块盘装了ESXi的系统。本以为…

云原生之使用Docker部署webssh工具sshwifty

云原生之使用Docker部署webssh工具sshwifty一、sshwifty介绍1.sshwifty简介2.shwifty 特点二、检查本地docker环境1.检查docker版本2.检查docker状态三、下载sshwifty镜像四、服务器生成凭证文件五、创建sshwifty容器1.创建部署目录2.创建sshwifty容器3.查看sshwifty容器状态六…

uniapp 之 小程序线上版本一直处于加载状态

前言 最开始小程序都是体验版的&#xff0c;后来应老大需求&#xff0c;把体验版提交审核为正式版本&#xff08;线上版本&#xff09;&#xff0c; 原本以为版本审核得花费几天时间&#xff0c;没想到它这审核速度挺快的&#xff0c;不到3小时就审核通过了&#xff0c;审核…

[go]汇编语言

文章目录计算机结构常量与变量全局变量常量数组字符串函数参数与返回值goroutineGo汇编程序无法独立使用&#xff0c;必须以Go包的方式组织&#xff0c;同时包中至少要有一个Go语言文件用于指明当前包名等基本包信息。如果Go汇编代码中定义的变量和函数要被其它Go语言代码引用&…

Spark的架构与基本运行流程

Spark的架构与基本运行流程一、Spark中的核心概念二、Spark中的核心架构设计一、Spark中的核心概念 &#xff08;1&#xff09;RDD&#xff0c;Spark中最核心的概念就是RDD&#xff08;Resillient Distributed Dataset&#xff0c;弹性分布式数据集&#xff09;。换而言之&…

MySQL---事务及锁机制

MySQL之事务以及锁机制 文章目录MySQL之事务以及锁机制事务事务的操作1、开启事务&#xff1a;start Transaction2、提交事务&#xff1a;commit Transaction3、回滚事务&#xff1a;Rollback Transactionset命令事务的特性---ACID事务的隔离级别1.READ UNCOMMITTED 读未提交2.…

毒鸡汤 | PHPStudy搭建web项目

文章目录前言展示准备工作环境创建网站新建数据库PHP7现成版自己折腾版前言 折腾了很久&#xff0c;终于自己改成功了。问题不多&#xff0c;主要原因是自己没怎么开发过&#xff0c;不熟悉数据库连接原理&#xff0c;现在回头看真的改的很简单。问题主要是现在用的PHP7和旧版的…

I-03数据结构与算法(python版)

最近学习数据结构&#xff0c;对于从未接触过数据结构的我来说&#xff0c;老师不仅讲解理论&#xff0c;还有代码的逐层分析&#xff0c;非常不错&#xff0c;受益匪浅&#xff01;&#xff01;&#xff01;&#xff08;以下是学习记录&#xff09; 数据结构与算法&#xff0…

DEiT:通过注意力训练数据高效的图像transformer 蒸馏

摘要 最近&#xff0c;纯基于注意力的神经网络被证明可以解决图像理解任务&#xff0c;如图像分类。这些高性能的是使用大型基础设施预先训练数以亿计的图像&#xff0c;从而限制了它们的采用。 本文仅通过在Imagenet上训练&#xff0c;产生有竞争力的无卷积transformer。我们…

react原理-transition概念

在react18之中&#xff0c;引入了transition的概念。而且有一个新的api和两个新的hooks startTransitionuseTransitionuseDeferredValue 场景应用&#xff1a; 比如通过输入框输入内容更新列表内容&#xff0c;对于用户来说&#xff0c;输入框输入之后立马反馈的优先级是高过…

VS ChatGPT 中文版插件安装

1.打开Visual Studio Code 2.搜索chatGpt中文版 3.安装完后&#xff0c;重启一下软件 有国模式和国外模式&#xff0c;更多的教程请看插件作者的视频教程

分布式事物

Seata实践 XA模式 AT模式 TCC模式 性能 Saga模式 高可用------集群的形式 Seata实践解决方案 解决方式 Seata 引入服务协调者模式 实践步骤&#xff1a; 分布式事物的入口方法&#xff0c;会调用其他的微服务&#xff0c;每次调用的服务都是一个分支事物调用了多少个分支事…

SpringSecurity 认证实现

在之前一篇 博客 已经说明了 SpringSecurity 认证与授权的原理。这篇用来具体实现一下。 1、新建SecurityConfig 并创建认证管理器 Bean public AuthenticationManager authenticationManager() {... }2、新建认证提供者 Configuration public class SystemUserPasswordAuth…

Oracle项目管理之PrimaveraUnifier组织-业主/合作伙伴公司

目录 一、业主公司 二、合作伙伴公司 三、成员公司 Oracle Primavera Unifier 是企业项目协同管理系统&#xff0c;在国际化项目管理中&#xff0c;在进行常规的业务管理之外&#xff0c;对合同公司/EPC或分包供应商也有一定的管理要求&#xff0c;在Unifier中为了更好的实现…

sja1000 CAN驱动学习、调试记录(基于PeliCan Mode)

一、基础知识 网上讲sja1000 CAN总线控制器的资料很多&#xff0c;这里放一个引路贴&#xff1a;(151条消息) CAN总线控制器SJA1000_FATE的博客-CSDN博客_sja1000 BasicCAN Mode&#xff1a;仅支持11位的ID。 PeliCan Mode&#xff1a;在扩展模式下&#xff0c;允许使用 11 位 …

找出DataFrame中指定数据类型的列:select_dtypes()函数

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 找出DataFrame中指定数据类型的列 select_dtypes()函数 选择题 下列说法错误的是? import pandas as pd myDF pd.DataFrame({A:[1,2],B:[1.0,2.0],C:[a,b]}) print("【显示】myDF&qu…

leecode#同构字符串#反转链表

题目描述&#xff1a; 给定两个字符串 s 和 t &#xff0c;判断它们是否是同构的。 如果 s 中的字符可以按某种映射关系替换得到 t &#xff0c;那么这两个字符串是同构的。 每个出现的字符都应当映射到另一个字符&#xff0c;同时不改变字符的顺序。不同字符不能映射到同一…

ReentrantLock详解

JUC中的锁API 在juc中有一个Lock接口他的作用和synchronized相似都是为了保证线程安全性提供的解决方案 Lock中定义了一系列释放锁和抢占锁相关的API lock() 抢占锁资源 如果当前线程没有抢占到锁 则阻塞 tryLock() 尝试抢占锁资源 如果抢占成功则返回true 否则返回false unlo…