【JUC】二十三、LongAdder:多线程计数的更优解

news2024/11/26 13:43:16

文章目录

  • 1、常用API
  • 2、热点商品点赞计算器
  • 3、LongAdder高性能的原理
  • 4、源码:LongAdder-add方法
  • 5、源码:LongAdder-longAccumulate方法
  • 6、源码:LongAdder-sum方法
  • 7、AtomicLong和LongAdder的对比

Since 1.8,新加原子操作增强类:

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

API文档:runoob.com/manual/jdk11api/java.base/java/util/concurrent/atomic/LongAdder.html

1、常用API

常用方法:
在这里插入图片描述

LongAdder只能用来计算加法,且从零开始计算,LongAccumulator则更灵活,可传入一个函数式接口和初始值,函数式接口中自定义计算逻辑,加减乘除。

LongAdder longAdder = new LongAdder();
longAdder.increment();  //+1
longAdder.increment();
longAdder.increment();
System.out.println(longAdder.sum());  //3
//new LongAccumulator((x, y) -> x * y, 1)
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
    @Override
    //left:初始值,right:传进来的值
    public long applyAsLong(long left, long right) {
        return left * right;
    }
}, 1);
longAccumulator.accumulate(2); //2
longAccumulator.accumulate(3); //6
System.out.println(longAccumulator.get());  //6

2、热点商品点赞计算器

热点商品点赞计算器,点赞数加加统计,不要求实时精确。比如:50个线程,每个线程100W次,总点赞数出来。分析下,点赞一次就+1,本质是多线程下的并发的i++:

class ClickNumber{
    int number = 0;
    //方式一
    public synchronized void clickBySynchronized(){
        number++;
    }
    //方式二
    AtomicLong atomicLong = new AtomicLong(0);
    public void clickByAtomicLong(){
        atomicLong.getAndIncrement();
    }
    //方式三
    LongAdder longAdder = new LongAdder();
    public void clickByLongAdder(){
        longAdder.increment();
    }
    //方式四
    LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x+y,0);
    public void clickByLongAccumulator(){
        longAccumulator.accumulate(1);
    }
}

调用4种方式,借助辅助类计数器,计算5000000点赞耗时:

public class AccumulatorDemo {
    public static final int _1W = 10000;
    public static final int threadNum = 50;

    public static void main(String[] args) throws InterruptedException {
        ClickNumber clickNumber = new ClickNumber();   //共享资源类对象
        long startTime;
        long endTime;
        CountDownLatch countDownLatch1 = new CountDownLatch(threadNum);  //计数器
        CountDownLatch countDownLatch2 = new CountDownLatch(threadNum);
        CountDownLatch countDownLatch3 = new CountDownLatch(threadNum);
        CountDownLatch countDownLatch4 = new CountDownLatch(threadNum);
        //====方法一的耗时===
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNum ; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 100 * _1W; j++) {
                        clickNumber.clickBySynchronized();
                    }
                } finally {
                    //50个线程,做完一个少一个
                    countDownLatch1.countDown();
                }
            },String.valueOf(i)).start();

        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("synchronized耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.number);
        //====方法2的耗时===
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNum ; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 100 * _1W; j++) {
                        clickNumber.clickByAtomicLong();
                    }
                } finally {
                    //50个线程,做完一个少一个
                    countDownLatch2.countDown();
                }
            },String.valueOf(i)).start();

        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("AtomicLong耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.atomicLong.get());
        //====方法3的耗时===
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNum ; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 100 * _1W; j++) {
                        clickNumber.clickByLongAdder();
                    }
                } finally {
                    //50个线程,做完一个少一个
                    countDownLatch3.countDown();
                }
            },String.valueOf(i)).start();

        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("LongAdder耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.longAdder.sum());
        //====方法4的耗时===
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= threadNum ; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 100 * _1W; j++) {
                        clickNumber.clickByLongAccumulator();
                    }
                } finally {
                    //50个线程,做完一个少一个
                    countDownLatch4.countDown();
                }
            },String.valueOf(i)).start();

        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("LongAccumulator耗时:" + (endTime - startTime) + ",当前点赞数:" + clickNumber.longAccumulator.get());
    }
}

运算结果:

在这里插入图片描述

结论:很大的高并发下,LongAdder的性能优于AtomicLong(减少了乐观锁的重试次数)

在这里插入图片描述

3、LongAdder高性能的原理

LongAdder --> Striped64类 --> Number类,Cell类,单元格类,是Striped64类的一个内部类。

在这里插入图片描述

Striped64类的属性解释:

在这里插入图片描述
前面的AtomicLong,N个线程同时CAS修改一个值,每次只会有一个成功,而其余N-1个线程一定失败而继续不停的自旋,N很大时,就会有大量的失败自旋。

而LongAdder的基本思路就是分散热点,不要逮着一个值自旋,而是将value值分散到一个Cell数组中,不同线程会命中到Cell数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

sum()时会将所有Cel数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,每次并发CAS的失败线程数量就少了。

在这里插入图片描述在这里插入图片描述

LongAdder在无竞争的情况下,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零分散热点的做法,用空间换时间,新加一个数组cells,将一个value的累加拆分进这个数组cells来分担。

在这里插入图片描述

多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到cells数组的某下标,然后对该下标所对应的值进行自增换作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果。

4、源码:LongAdder-add方法

new LongAdder().increment()

各方法底层的调用关系为:

increment() --> add(1L)  --> longAccumulate()  
//最后累加的结果
--> sum()

在这里插入图片描述
从add方法开始看,方法局部变量有:

  • as:Striped64类的cells数组
  • b:Striped64类的的base值
  • v:期望值
  • m:cells数组的长度
  • a:当前线程命中的cell数组中的cell单元格对象

首次,只有一个线程的时候,base就可以完成,cell == null,casBase方法+1的操作操作成功,if条件不成立,直接跳出循环,但已经在||条件判断的时候顺带着完成了+1的操作。

public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
    	// true无竞争,false表示竟争激烈,多个线hash到同一个CeLL,可能要扩容
        boolean uncontended = true;
        //||下的四个条件分别为:
        //条件1:cell单元格数组为空
        //条件2:cell长度小于1,一般不会,因为到这儿说明cell不为null,而其长度2次幂起步
        //条件3:getProbe获取当前线程的哈希值,映射到cell后,cell为空,说明当前线程还没更新过cell,应初始化一个cell
        //条件4:更新当前线程所映射的cell失败,即多个线程hash到了同一个cell,说明竞争激烈,取反后到longAccumulate继续扩容
        if (cs == null || (m = cs.length - 1) < 0 ||
        	//getProbe方法返回的时线程中的threadLocalRandomProbe字段
        	//它是通过随机数生成的一个值,对于一个确定的线程,这个值固定,除非刻意修改
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);   //Striped64中的方法扩容
    }
}

随着线程增多,CAS判断失败,false,取反后条件成立,进入if体中,uncountened=true,默认没有冲突,此时还没扩容,Cell[] as == null是成立的,进入longAccumulate(),按2次幂阔,出来两个Cell。(longAccumulate方法下面再详细展开)

在这里插入图片描述

此时,base可以+1,cell0、cell1也可以做+1,再调add,此时Cell[] as不再等于null,且length-1=1>0(2次幂) ,继续看||后面的条件,a = as[getProbe() & m],算坑位,比如算到了cell1这个单元格,此时,假设cell1中有值,为1,做个CAS,x为1,则1改为2,返回true,取反为false,跳出方法,但+1也随着条件判断完成了。

在这里插入图片描述

如上图,再并发,竞争激烈,cell0、cell1扛不住了也,cell上cas失败,uncontended为false,取反,true,再进入longAccumulate()扩容,2个变4个。

总结:

  • 最初无竞争时只更新base;
  • 如果更新base失败后,首次新建一个Cell[ ]数组
  • 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容

代码亮点:调用方法做为判断条件,最终的效果就是活儿干了(数据改变了),条件也做了判断了。借鉴!

简略版图解:

在这里插入图片描述

5、源码:LongAdder-longAccumulate方法

Striped64类中的一些属性和方法:getProbe方法,获取线程的hash值,这个值用于判断去cell数组的哪个槽位中去。

在这里插入图片描述

//getProbe方法返回的时线程中的threadLocalRandomProbe字段
static final int getProbe(){
	return UNSAFE.getInt(Thread.currentThread(),PROBE);
}

longAccumulate()方法的入参:

  • long x :需要做雷加的值,increment调用下,一般默认都是+1
  • LongBinaryOperator fn :默认传递的是null
  • wasUncontended:竞争标识,如果是false则代表有竞争,只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false

longAccumulate方法开头处理下线程的probe值:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //存储线程的probe值                      
        int h;
        //getProbe返回为0,即线程随机数未初始化
        if ((h = getProbe()) == 0) {
        	//使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
            ThreadLocalRandom.current(); // force initialization
            //重新获取probe值,hash被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true,表示无竞争
            h = getProbe();
            //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,背定还不存在竟争激烈,wasUncontended竞争状态为true
            wasUncontended = true;
        }
//......

然后longAccumulate方法源码大体结构为:首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:

  • CASE1: Cell[ ] 数组已经初始化

  • CASE2:CelI[ ] 数组未初始化(首次新建)

  • CASE3:Cell[ ] 数组正在初始化中

在这里插入图片描述

先看Case2:未初始化过Cell[ ] 数组,尝试占有锁并首次初始化cells数组

cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁。为0时,抢到锁,&&后面casCellsBusy改为1,初始化创建Cell[2]后,finally中cellsBusy改回0,注意下面有点双重检锁的味道。

在这里插入图片描述

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1]= new Cell(x) 即创建一个新的Cell元素,value是累加的值x,默认为1。h & 1类似于之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1),同hashmap一个意恩。

再看Case3:上面竞争很激烈,else兜底的,多个线程尝试CAS修改失败的线程会走到这个分支

该分支直接操作base,将值累加到base

在这里插入图片描述

最后看Case1:Cell数组不再为空且可能存在Cell数组扩容,多个线程同时命中一个cell的竞争。此个If分支又分为6中if情况:

在这里插入图片描述

上面代码判断当前线程hash后指向的数据位置元素是否为空,如果为空则将Cell数据放入数组中,跳出循环。如果不空则继续循环。

在这里插入图片描述

wasUncontended表示cells初始化后,当前线程竞争修改失败wasUncontended =false,这里只是重新设置了这个值为true,紧接着执行Striped64类的advanceProbe(h)方法重置当前线程的hash,重新循环,重新再竞争一次。

在这里插入图片描述说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接break跳出循环。

在这里插入图片描述

如果n大于CPU最大数量,不可扩容并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试。

在这里插入图片描述

如果扩容意向collide是false则修改它为true,然后重新算当前线程的hash值继续循环,如果当前数组的长度已经大于了CPU的核数,就会再次设置扩容意向collide=false (见上一步)

在这里插入图片描述

总结:

在这里插入图片描述
在这里插入图片描述

6、源码:LongAdder-sum方法

在这里插入图片描述
sum()会将斯有Cell数组中的value和base累加作为返回值。核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

在这里插入图片描述

为啥并发情况下sum的值不精确?

sum执行时,并没有限制对base和cells的更新(一句要命的话),所以LongAdder不是强一致性的,它是最终一致性的

首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。

7、AtomicLong和LongAdder的对比

AtomicLong:

  • 通过CAS+自旋实现
  • 线程安全,可允许一些性能损耗,要求高精度时选AtomicLong
  • 保证精度,但以性能为代价
  • AtomicLong是多个线程针对单个热点值value进行原子操作
  • 缺点是高并发下,性能急剧下降,且AtomicLong的自旋同时也是瓶颈:因为N个线程同时CAS一个值,只有一个线程成功,其余N-1个线程要不断自旋

LongAdder:

  • 通过CAS+Base +Cell数组分散热点来实现
  • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
  • 保证性能,但以精度为代价
  • LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
  • 缺陷是:sum求和后,还有计算线程修改结果的话,最后返回的结果不够准确

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1296449.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

elementUI中的 “this.$confirm“ 基本用法,“this.$confirm“ 调换 “确认“、“取消“ 按钮的位置

文章目录 前言具体操作总结 前言 elementUI中的 "this.$confirm" 基本用法&#xff0c;"this.$confirm" 调换 "确认"、"取消" 按钮的位置 具体操作 基本用法 <script> this.$confirm(这是数据&#xff08;res.data&#xff0…

『Redis』在Docker中快速部署Redis并进行数据持久化挂载

&#x1f4e3;读完这篇文章里你能收获到 在Docke中快速部署Redis如何将Redis的数据进行持久化 文章目录 一、拉取镜像二、创建挂载目录1 宿主机与容器挂载映射2 挂载命令执行 三、创建容器—运行Redis四、查看运行情况 一、拉取镜像 版本号根据需要自己选择&#xff0c;这里以…

现代皮质沙发模型材质编辑

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 当谈到游戏角色的3D模型风格时&#xff0c;有几种不同的风格&#xf…

微前端 模块联邦技术

目录 介绍 基本使用 演示用法 初始化配置文件 remote 项目 host 项目 为什么讲这个呢&#xff0c;很多人觉得他不是微前端&#xff0c;也有人定义它也是微前端&#xff0c;看怎么理解了&#xff0c;我觉得他是一个去中心化技术&#xff0c;它可以让多个独立构建的应用…

pytorch的二次索引矩阵无法赋值问题

最近在研究中发现torch一个问题&#xff0c;即torch的二次索引的矩阵无法赋值。 具体来说&#xff0c;给定相同的初始常数矩阵a和iou_target矩阵, 以及另一iou矩阵&#xff0c;直接赋值是没问题的。 然而&#xff0c;当对iou_target矩阵进行二次索引时&#xff0c;即idx矩阵和…

基于AIS数据的船舶密度计算与规律研究

参考文献&#xff1a;[1]陈晓. 基于AIS数据的船舶密度计算与规律研究[D].大连海事大学,2021.DOI:10.26989/d.cnki.gdlhu.2020.001129. 谢谢姐姐的文章&#xff01; 网格化AIS数据 网格化 AIS 数据是处理和分析船舶轨迹数据的一种有效方法&#xff0c;特别是当涉及到密度计算和…

驾驭苹果的人工智慧模式:克服反击与应对挑战

苹果一年一度的秋季「春晚」时间越来越近&#xff0c;但在大模型浪潮下&#xff0c;苹果何时推出自己的「苹果GPT」成了另一个关注的话题。 毕竟&#xff0c;前有华为&#xff0c;后有小米&#xff0c;在中国手机厂商争相将大模型装进移动终端的同时&#xff0c;苹果却依旧对A…

Java Spring + SpringMVC + MyBatis(SSM)期末作业项目

本系统是一个图书管理系统&#xff0c;比较适合当作期末作业主要技术栈如下&#xff1a; - 数据库&#xff1a;MySQL - 开发工具&#xff1a;IDEA - 数据连接池&#xff1a;Druid - Web容器&#xff1a;Apache Tomcat - 项目管理工具&#xff1a;Maven - 版本控制工具&#xf…

ElementUI 时间选择器如何限定选择时间

DatePicker 日期选择器 | Element Plus 我们如何限定我们的选择时间呢&#xff0c;比如限定选择时间为今天之前&#xff0c;或者今天之后的时间&#xff1f; 我们可以使用官方提供的disabled-date来实现 我们通过这个属性 做一个回调函数&#xff0c;在里面比较我们想要限定的时…

CSS——标准流、浮动、Flex布局

1、标准流 标准流也叫文档流&#xff0c;指的是标签在页面中默认的排布规则&#xff0c;例如&#xff1a;块元素独占一行&#xff0c;行内元素可以一行显示多个。 2、浮动 作用&#xff1a;让块元素水平排列 属性名&#xff1a;float 属性值&#xff1a; left&#xff1a;…

spark链接hive时踩的坑

使用spark操作hive&#xff0c;使用metastore连接hive&#xff0c;获取hive的数据库时&#xff0c;当我们在spark中创建数据库的时候&#xff0c;创建成功。 同时hive中也可以看到这个数据库&#xff0c;建表插入数据也没有问题&#xff0c;但是当我们去查询数据库中的数据时&a…

泽攸科技二维材料转移台的应用场景及优势

随着二维材料的广泛研究和各种潜在应用的开发&#xff0c;对于二维材料样品的精密操控与转移的需求日益增加。特别是一些新型二维材料的制备和器件集成制备中&#xff0c;需要在显微镜下对样品进行观察与定位&#xff0c;并能够在微米甚至纳米量级上精确移動和转移样品。 传统…

Navicat 技术指引 | 适用于 GaussDB 分布式的备份/还原功能

Navicat Premium&#xff08;16.3.3 Windows 版或以上&#xff09;正式支持 GaussDB 分布式数据库。GaussDB 分布式模式更适合对系统可用性和数据处理能力要求较高的场景。Navicat 工具不仅提供可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结…

(六) python观察者设计模式

6.1行为型模式简介 观察者设计模式是最简单的行为型模式之一,所以我们先简单了解一下行为型模式 创建型模式的工作原理是基于对象的创建机制的。由于这些模式隔离了对象的创建细 节&#xff0c;所以使得代码能够与要创建的对象的类型相互独立。结构型模式用于设计对象和类的结…

BGP综合

1、使用PreVal策略&#xff0c;确保R4通过R2到达192.168.10.0/24。 2、使用AS_Path策略&#xff0c;确保R4迪过R3到达192.168.11.0/24。 3、配置MED策略&#xff0c;确保R4通过R3到达192.168.12.0/24。 4、使用Local Preference策略&#xff0c;确保R1通过R2到达192.168.1.0…

IntelliJ idea卡顿解决,我遇到的比较管用的方案

Setttings> Build, Execution,Deployment>Debugger> Data Views> Java 取消 Enable "toString()" object view; Speed up debugging in IntelliJ Yesterday, I observed painfully slow debugging in IntelliJ. Every step over or step in took almost…

近期Chrome浏览器 不知哪个版本升级后原先http强制跳转到https,导致服务端302强制跳转到http也没反应

关于Chrome更新http强制跳转到https解决方法 近期Chrome浏览器 不知哪个版本升级后原先http强制跳转到https&#xff0c;导致服务端302强制跳转到http也没反应一、F12检查加载的Response Headers中有没有Non-Authoritative-Reason二、找了资料后得到解决方案&#xff1a;三、找…

在阿里云国际上构建共享虚拟主机业务

我们需要3个ECS实例&#xff0c;1个RDS MySQL实例和2个域名。我将使用该域作为我的主域和辅助域。sarathy.infosarathy.site 以下架构图左侧所示的两个 ECS 实例将托管我们的主网站。一个ECS实例用于部署WHMCS&#xff0c;另一个实例用于部署WordPress。WordPress 和 WHMCS 都…

pytorch-mask-rcnn 官方

This is a Pytorch implementation 实现 of Mask R-CNN that is in large parts based on Matterports Mask_RCNN. Matterports repository is an implementation on Keras and TensorFlow. The following parts of the README are excerpts 摘录 from the Matterport README. …

Amazon CodeWhisperer 提供新的人工智能驱动型代码修复、IaC 支持以及与 Visual Studio 的集成...

Amazon CodeWhisperer 的人工智能&#xff08;AI&#xff09;驱动型代码修复和基础设施即代码&#xff08;IaC&#xff09;支持已正式推出。Amazon CodeWhisperer 是一款用于 IDE 和命令行的人工智能驱动型生产力工具&#xff0c;现已在 Visual Studio 中推出&#xff0c;提供预…