文章目录
- 核心变量
- 缓存行填充
- longAccumulate 方法
- 方法概览
- cells 数组已初始化
- 重新计算随机数
- 扩容前置条件
- cells 数组未初始化
- cas 更新 Base
Striped64 的核心是通过分治思想将对 base 的竞争分散到不同的 cell 单元中。
核心变量
// 通过分治的思想将对 base 的竞争分散到不同的 cell 单元中。
transient volatile Cell[] cells;
// 无竞争时直接更新 base
// 有竞争时同时更新 base 和 cells 数组
transient volatile long base;
// Spinlock 自旋锁
transient volatile int cellsBusy;
// Cell 对象封装了每个计算单元的值和常用方法。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
// 获取 value 属性偏移量
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
缓存行填充
使用 @Contended 注解来支持缓存行填充,避免 value 的缓存行伪共享。
longAccumulate 方法
一言以蔽之:合理分散线程竞争。将对 base 的 cas 分散到 base 和 cells 数组中。
方法概览
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 初始化线程随机数
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
// 为 true 表示没有竞争
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] as; Cell a; int n; long v;
// cells 数组已初始化
if ((as = cells) != null && (n = as.length) > 0){...}
// cells 数组未初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()){...}
// 未竞争到自旋锁的线程尝试直接更新 base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}
if ((as = cells) != null && (n = as.length) > 0){...}
- 若 cell 对象未初始化,则初始化 cell 对象
- 若 cell 对象已初始化则尝试更新 base,更新成功直接 break
- cell 数组长度小于 CPU 个数且 Cell 数组的引用未发生改变则扩容,扩容完成后 continue
else if (cellsBusy == 0 && cells == as && casCellsBusy()){...}
- CAS 修改 cellsBusy 为 1 成功的线程才能进行 cells 数组初始化
else if (casBase(v = base, (fn == null) ? v + x : fn.applyAsLong(v, x)))
- 没有竞争到初始化 cells 数组自旋锁的线程尝试 cas 修改 base,压榨 CPU,避免空转
cells 数组已初始化
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
// 初始化 cell 对象
// 没有其他线程获取到自旋锁,先获取锁再创建 Cell 对象
if (cellsBusy == 0) { // Try to attach new Cell
// 先创建 Cell 对象,后上锁,减少获取锁以后耗时
Cell r = new Cell(x); // Optimistically(乐观) create
// 再次判断有没有线程获取自旋锁,若没有则 CAS 更新 cellsBusy
if (cellsBusy == 0 && casCellsBusy()) {
// 此时已成功将 cellsBusy 修改为 1,即获取到自旋锁
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 再次检测 cells 数组有效且没有线程初始化当前单元 j。
// 理由是当前线程 CAS 获取锁之前,可能有其他线程获取到了锁,然后初始化了单元 j
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 创建成功,退出循环
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// CAS 尝试更新 Cell 对象的值,若更新成功直接返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// Cell 数组长度大于等于 CPU 个数或者 Cell 数组的引用发生了改变
else if (n >= NCPU || cells != as)
// 这里把 collide 改为 FALSE 是为了下个线程走到 else if (!collide) 这个条件中
// 从而避免进入 else if (cellsBusy == 0 && casCellsBusy()) 扩容操作
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 竞争自旋锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// cells == as 说明数组引用没有变化,则扩容数组为原先的 2 倍
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
// 拷贝数组元素
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 执行到这一行代码
h = advanceProbe(h);
}
重新计算随机数
else if (!wasUncontended)
wasUncontended
为false
,说明存在竞争,会进入else if (!wasUncontended)
这个条件判断- 然后执行
wasUncontended = true;
这行代码,将wasUncontended
更新为true
。 - 然后往下执行到
h = advanceProbe(h);
进行 rehash。 - rehash 之后再次判断
(a = as[(n - 1) & h]) == null
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true;
扩容前置条件
else if (n >= NCPU || cells != as)
- 这里把 collide 改为 FALSE 是为了下个线程走到 else if (!collide) 这个条件中
- 从而避免进入 else if (cellsBusy == 0 && casCellsBusy()) 扩容操作
- 因为 JAVA 中没有 goto 关键字
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {...}
cells 数组未初始化
- 此时 cells 数组未初始化,CAS 修改标志位 CELLSBUSY 成功的线程开始初始化全局数组 cells
- 获取到 cellsbusy 的线程,执行这些代码时,为线程安全
- 通过一个变量 cellsbusy,达到一个 CAS 的 CPU 原语,构建多条指令的原子性
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
// Initialize table
// 若 cells != as 说明存在其他线程修改了 cells 数组的引用
// cells == as 说明数组引用没有变化,则初始化 cells 数组
if (cells == as) {
Cell[] rs = new Cell[2];
// 初始化数组时,顺便初始化一个 cell 对象
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
// 初始化成功说明 x 已经存放到 cell 对象中放入 cells 数组,直接退出即可
if (init)
break;
}
cas 更新 Base
- 没有竞争到初始化 cells 数组自旋锁的线程尝试 cas 修改 base,压榨 CPU,避免空转
- 修改成功直接 break 退出循环
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break; // Fall back on using base