👽System.out.println(“👋🏼嗨,大家好,我是代码不会敲的小符,双非大四,Java实习中…”);
📚System.out.println(“🎈如果文章中有错误的地方,恳请大家指正!共同进步,共同成长✊”);
🌟System.out.println(“💡如果文章对您有所帮助,希望您可以三连支持一下博主噢🔥”);
🌈System.out.println("🚀正在完成计划中:接下来的三个月里,对梦想的追逐 ");
文章目录
- 故事背景
- CAS (Compare and Swap)
- LongAdder源码
- LongAdder 原理之伪共享
- LongAdder.add
- LongAdder.longAccumulate
- LongAdder.sum
- UnSafe
- 最后
故事背景
中午吃饭的时候,大哥们聊到了LongAdder 这个类的实现问题,当时我也有点忘记了具体源码实现,只记得是多线程下提高cpu的利用率,减少自旋时间。利用了分片锁,进行累加的操作!但是大哥们聊到了内存层面!我又回来复习了一下源码!
CAS (Compare and Swap)
compareAndSet:内存位置 V、旧的预期值 A 和新的值 B
cas和volatile可以实现无锁并发,适用于线程数少,多核cpu的场景
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再 重试呗。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想 改,我改完了解开锁,你们才有机会。
- CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思 【上下文切换非常少】
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
LongAdder源码
几个关键域
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy
LongAdder 原理之伪共享
Cell
// 防止缓存行伪共享 伪共享:一个缓存行加入了多个cell对象,导致都失效
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
缓存
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long) 缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中 CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
Core-0 要修改 Cell[0]、Core-1 要修改 Cell[1]。无论谁修改成功都会导致缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
LongAdder.add
public void add(long x) {
// as 为累加单元数组
// b 为基础值
// x 为累加值
Cell[] as; long b, v; int m; Cell a;
// 进入 if 的两个条件
// 1. as 有值, 表示已经发生过竞争, 进入 if
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 表示 cell 没有竞争
boolean uncontended = true;
if (
// as 还没有创建
as == null || (m = as.length - 1) < 0 ||
// 当前线程对应的 cell 还没有
(a = as[getProbe() & m]) == null ||
// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
!(uncontended = a.cas(v = a.value, v + x)) ) {
// 进入 cell 数组创建、cell 创建的流程
longAccumulate(x, null, uncontended);
}
}
}
LongAdder.longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
if ((h = getProbe()) == 0) {
// 初始化 probe
ThreadLocalRandom.current();
// h 对应新的 probe 值, 用来对应 cell
h = getProbe();
wasUncontended = true;
}
// collide 为 true 表示需要扩容
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
// 已经有了 cells
if ((as = cells) != null && (n = as.length) > 0) {
// 还没有 cell
if ((a = as[(n - 1) & h]) == null) {
// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
// 成功则 break, 否则继续 continue 循环
}
// 有竞争, 改变线程对应的 cell 来重试 cas
else if (!wasUncontended)
wasUncontended = true;
// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
else if (n >= NCPU || cells != as)
collide = false;
// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
else if (!collide)
collide = true;
// 加锁
else if (cellsBusy == 0 && casCellsBusy()) {
// 加锁成功, 扩容
continue;
}
// 改变线程对应的 cell
h = advanceProbe(h);
}
// 还没有 cells, 尝试给 cellsBusy 加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
// 成功则 break;
}
// 上两种情况失败, 尝试给 base 累加
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}
LongAdder.sum
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
UnSafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
public class Cas_UnSafe {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
/**
* @author xiaofu
* 底层的UnSafe对象
*/
public class Demo12UnSafe {
public static void main(String[] args) throws NoSuchFieldException {
Unsafe unsafe = Cas_UnSafe.getUnsafe();
System.out.println(unsafe);
Teacher teacher = new Teacher();
// 1.获取域的偏移地址
long id = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long name = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
// 2.执行 cas 操作
unsafe.compareAndSwapInt(teacher, id, 0, 9);
unsafe.compareAndSwapObject(teacher, name, null, "小米");
System.out.println(teacher);
}
}
class Teacher {
volatile int id;
volatile String name;
@Override
public String toString() {
return "Teacher{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
class Cas_UnSafe {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
最后
慢慢的来,别着急!学会有质量的走过每一步
我是代码不会敲的小符,希望认识更多有经验的大佬,也在努力摸索出自己的道路
欢迎添加小符微信:A13781678921,一起加油