目录
AtomicLong用法
源码分析
问题
解决
LongAdder用法
高并发下效率测试
原理
源码
add(long x)
Striped64的longAccumulate
伪共享
总结
视频讲解:
AtomicLong用法
public static void main(String[] args) {
AtomicLong i = new AtomicLong(0);
System.out.println(i.getAndIncrement());
System.out.println(i.addAndGet(30));
}
源码分析
如果value被修改了, CAS失败, 那就获取最新值,继续CAS, 直到成功~
问题
如果并发特别大,修改频繁,那么你要CAS自旋很多次, 效率大大降低
AtomicLong 的 Add 操作是依赖自旋不断的 CAS 去累加一个 Long 值。 如果在竞争激烈的情况下,CAS 操作不断的失败,就会有大量的线程不断的自旋尝试 CAS 会造成 CPU 的极大的消耗
解决
使用LongAdder
LongAdder用法
LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.add(10);
longAdder.sum();
高并发下效率测试
@Slf4j
public class Demo12 {
public static void main(String[] args) throws Exception {
long start1 = System.currentTimeMillis();
testLongAdder();
System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start1) + "ms");
long start2 = System.currentTimeMillis();
testAtomicLong();
System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - start2) + "ms");
System.out.println("----------------------------------------");
}
static void testAtomicLong() throws Exception{
AtomicLong atomicLong = new AtomicLong();
List<Thread> list = new ArrayList();
for (int i = 0; i < 9999; i++) {
list.add(new Thread(() -> {
for(int j=0;j<100000;j++){
atomicLong.incrementAndGet();
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
System.out.println("AtomicLong value is : " + atomicLong.get());
}
static void testLongAdder() throws Exception{
LongAdder longAdder = new LongAdder();
List<Thread> list = new ArrayList();
for (int i = 0; i < 9999; i++) {
list.add(new Thread(() -> {
for(int j=0;j<100000;j++){
longAdder.increment();
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
System.out.println("LongAdder value is : " + longAdder.longValue());
}
}
原理
代码看不懂,请看我讲解的视频哈链接:
点我进入
源码
add(long x)
public class LongAdder extends Striped64 implements Serializable {
public void add(long x) {
//as 表示cells 引用
//b 表示 base
//v 表示 期望值表示获取的base值
//m 表示cells 数组的长度
//a 表示当前线程命中的 cell单元格
Cell[] as; long b, v; int m; Cell a;
//条件一: true-> 表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
// false-> 表示cells 未初始化,当前所有线程应该将数据写入base中
//条件二:true-> 表示当前线程cas替换数据成功
//false-> 表示发生竞争了,可能需要重试 或者 扩容
//true 是指casBase(b = base, b + x),不是整体,是部分
//true时,取反是false
if ((as = cells) != null || !casBase(b = base, b + x)) {
//true 未竞争 false 发生竞争
boolean uncontended = true;
//条件一:true-> 说明 cells 未初始化,也就是多线程写base发生竞争
// false-> 说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值
if (as == null || (m = as.length - 1) < 0 ||
//条件二:getProbe() 获取当前线程的hash值, m表示 cells长度-1 cells长度 一定是2的次方数 16-1=15=1111 二进制
// true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
// false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值,添加到cell中
(a = as[getProbe() & m]) == null ||
//条件三:整体而言,有取反,true-> uncontended为false cas操作失败,意味着当前线程对应的cell 有竞争
//false-> 表示cas成功
!(uncontended = a.cas(v = a.value, v + x)))
//哪些情况会调用
//true-> 说明 cells 未初始化,也就是多线程写base发生竞争
//true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
//true-> cas操作失败,意味着当前线程对应的cell 有竞争
longAccumulate(x, null, uncontended);
}
}
}
Striped64的longAccumulate
代码看不懂,请看我讲解的视频哈链接:
【总结者】LongAdder源码讲解(图解+代码逐行分析)4K面试必看_哔哩哔哩_bilibili
点我进入
Striped64是一种高并发累加器,有效解决了原子类累加的弊端。Striped64将线程竞争的操作分散开来,每个线程操作一个cell,而sum则等于base和所有cell值的和。
abstract class Striped64 extends Number {
//1、true-> 说明 cells 未初始化,也就是多线程写base发生竞争
//2、true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
//3、true-> cas操作失败,意味着当前线程对应的cell 有竞争
//longAccumulate(x, null, uncontended);
//wasUncontended :只有cells 初始化之后,并且当前线程 竞争修改失败,才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// h 表示线程hash值
int h;
//条件成立:说明当前线程 还未分配线程
if ((h = getProbe()) == 0) {
//给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
//取出当前线程的hash值
h = getProbe();
// 因为默认情况下 , 肯定是写入到了 cells[0]位置,不把它当做一次真正的竞争
//因为没有hash值,所有才会在cells[0]竞争,所有表示一次真正意义上的竞争
wasUncontended = true;
}
//表示扩容意向 false 一定不会扩容,true 可能会扩容
boolean collide = false; // True if last slot nonempty
//自旋
for (;;) {
// as 表示cells引用
// a 表示当前线程命中的cell
// n 表示cells 数组长度
// v 表示 期望值
Cell[] as; Cell a; int n; long v;
// CASE1:表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//下面两种情况才会进入该if
//2、true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
//3、true-> cas操作失败,意味着当前线程对应的cell 有竞争
// CASE1.1: true->表示当前线程 对应的下标未知的cell 为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {
//true -> 表示当前 锁未被占用
if (cellsBusy == 0) { // Try to attach new Cell
//拿当前的x创建Cell
Cell r = new Cell(x); // Optimistically create
//条件一:true-> 表示当前锁未被占用 false -> 表示锁被占用
//条件二:true-> 表示当前线程获取锁成功, false-> 当前线程获取锁失败
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
// rs 表示当前cells引用
// m 表示cells长度
// j 表示当前线程命中的下标
Cell[] rs; int m, j;
//条件一 条件二 恒成立
//rs[j = (m - 1) & h] == null 是为了防止其它线程初始化 该位置,然后当前线程再次初始化该位置
//在上面最近的if 时,其它线程可能抢走锁,让 rs[j]已经赋值,当该线程抢回锁之后可以防止重复赋值
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
//扩容意向更改为false (a = as[(n - 1) & h]) == null
//因为当前cell为 null,cells够用,所以不需要扩容
collide = false;
}
//CASE1.2:
//只有当前线程的hash值不是0,并且和其它线程一起打在了同一个cell上时,wasUncontended才是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE1.3:当前线程rehash 过 hash值,然后新命中的cell不为空
//true -> 写成功,退出循环
//false -> 表示rehash之后命中的cell 也有竞争,重试一次
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//CASE1.4:
//条件一:n >= NCPU true-> 扩容意向为false,表示不扩容了 false-> 表示还可以扩容
//条件二:cells != as true -> 其它线程已经扩容过了,当前线程rehash
else if (n >= NCPU || cells != as)
//扩容意向:改为false,表示不扩容了
collide = false; // At max size or stale
//CASE 1.5:
//collide取反 设置扩容意向 为true,但不一定扩容
else if (!collide)
collide = true;
//CASE 1.6:真正的扩容
// 条件一:cellsBusy == 0 当前线程无锁状态,可以去竞争
// 条件二:casCellsBusy() 当前线程获取锁,true:可以做扩容逻辑,false:表示其它线程正在做扩容相关操作
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as 当其它线程抢了线程,先扩容,当前线程抢回后,可以防止再次扩容
// 防止重复扩容
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重置hash值 rehash
h = advanceProbe(h);
}
// CASE2:前置条件cells 还未初始化 as 为 null
// 条件一:cellsBusy 表示未加锁
// 条件二:cells == as? 因为其他线程可能会在你给as赋值之后 修改了 cells
// 条件三:true 表示获取锁成功 会把cellsBusy = ,false表示其他线程正在持有锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//1、true-> 说明 cells 未初始化,也就是多线程写base发生竞争
boolean init = false;
try { // Initialize table
// cells == as 又要对比,防止再次初始化
// 防止其他线程已经初始化了,当前线程再次初始化 导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// CASE3:
// 1、当前cellBusy 加锁状态,表示其它线程正在初始化cells,所有当前线程将值累加到base
// 2、cells被其他线程初始化后,当前线程需要将数据累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
}
伪共享
上面可以看到 Cell
类被 @sun.misc.Contended
注解了, 是用来避免缓存的伪共享
CPU Cache Line
在计算机的架构中 L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。 L1缓存很小但很快,并且紧靠着在使用它的CPU内核; L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用; L3更大、更慢,并且被单个插槽上的所有CPU核共享; 最后是主存,由全部插槽上的所有CPU核共享
Cache 是由很多个cache line(缓存行)组成的。 每个cache line通常是 64 字节,并且它有效地引用主内存中的一块地址。 一个Java的long类型变量是 8 字节,因此在一个缓存行中可以存 8 个long类型的变量
多个线程并发的修改一个缓存行中的不同变量的时候, 比如CPU1更新a, CPU2更新b, 但是a和b在同一个缓存行上,每个线程都要去竞争缓存行的所有权来更新变量。 如果核心1获得了所有权,缓存子系统将会使核心2中对应的缓存行失效。 当核心2获得了所有权然后执行更新操作,核心1就要使自己对应的缓存行失效。 这会来来回回的经过L3缓存,大大影响了性能。
避免伪共享
避免伪共享的情况出现, 就需要让可能出现线程竞争的变量分开到不同的 Cache Line 中, 使用空间换时间的思维
JDK8 有专门的注解 @Contended
来避免伪共享
总结
add(long x):
如果 cells 数组不为空, 或者 cas 操作 base 失败, 则进入longAccumulate
longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended):
整个 for(;;) 死循环,都是以 cas 操作成功而告终。否则则会修改上述描述的几个标记位,重新进入循环。
包含几种情况:
cells 不为空
如果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
如果 cas 失败,继续循环
如果 cell 不为空,且 cell cas 成功,退出
如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试。(扩容没意义)
设置 collide 为 true。
获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
cells 为空且获取到 cellsBusy ,init cells 数组,然后赋值退出。
cellsBusy 获取失败,则进行 baseCas ,操作成功退出,不成功则重试。
视频讲解:
代码看不懂,请看我讲解的视频哈链接:
【总结者】LongAdder源码讲解(图解+代码逐行分析)4K面试必看_哔哩哔哩_bilibili