1. LongAddr底层实现过程
2. Striped64中变量或方法的定义
- base:类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上。
- collide:表示扩容意向,false一定不会扩容,true可能会扩容。
- cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁,1表示其他线程已经持有锁。
- casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功获取锁,返回true。
- getProbe():获取当前线程的hash值。
- advanceProbe():重置当前线程的hash值。
3. LongAddr设计思想
LongAddr基本思想是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突概率就很小。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零分散热点的做法,用空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对下标对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果。
4. LongAddr源码分析
更新步骤:
- 若Cells为空,尝试CAS更新base字段,成功则退出;
- 若Cells为空,CAS更新base字段失败,出现竞争,uncontended为true,调用longAccumulate;
- 若Cells非空,但当前线程映射槽为空,uncontended为true,调用longAccumulate;
- 若Cells非空,且当前线程映射槽非空,CAS更新Cell的值,成功则返回,否则,uncontended设为false,调用longAccumulate。
longAdder.increment() =>add()=>longAccumulate()
- add()源码
public void add(long x) {
// cs表示cells引用
Cell[] cs;
// b表示获取的base值, v表示期望值
long b, v;
// m表示cells数组的长度
int m;
// c表示当前命中的单元格
Cell c;
// 进入条件为cs不为空或casBase失败
if ((cs = cells) != null || !casBase(b = base, b + x)) {
// 获取线程id的hash映射
int index = getProbe();
// 无竞争。未出现多个线程竞争到一个槽位里面
boolean uncontended = true;
// cs未创建或长度为0时候
if (cs == null || (m = cs.length - 1) < 0 ||
// hash值对应的数组为空,需要进行初始化
(c = cs[index & m]) == null ||
// 对应的单元格cas失败
!(uncontended = c.cas(v = c.value, v + x)))
// 新建或者扩容
longAccumulate(x, null, uncontended, index);
}
}
- cas比较并交换
final boolean casBase(long cmp, long val) {
return BASE.weakCompareAndSetRelease(this, cmp, val);
}
- longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended, int index)
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended, int index) {
if (index == 0) {
ThreadLocalRandom.current(); // force initialization
index = getProbe();
wasUncontended = true;
}
for (boolean collide = false;;) { // True if last slot nonempty
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & index]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & index] == null) {
rs[j] = r;
break;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
index = advanceProbe(index);
}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[index & 1] = new Cell(x);
cells = rs;
break;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
}
}
5. longAccumulate
// x为默认值, fn默认null, wasUncontended为false代表竞争失败了
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended, int index) {
// 强制获取hash值
if (index == 0) {
ThreadLocalRandom.current(); // force initialization
index = getProbe();
wasUncontended = true;
}
for (boolean collide = false;;) { // True if last slot nonempty
Cell[] cs; Cell c; int n; long v;
// cells已经被初始化了
if ((cs = cells) != null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & index]) == null) {
// double check
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & index] == null) {
rs[j] = r;
break;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
index = advanceProbe(index);
}
// cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
// 新建长度为2的数组
Cell[] rs = new Cell[2];
// 将值映射到cells中
rs[index & 1] = new Cell(x);
cells = rs;
break;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base,cells正在进行初始化,则尝试直接在base上进行累加操作
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
}
}
6. LongAddr在并发情况下sum的值不精确
sum执行时,并没有限制对base和cells的更新。所以LongAddr不是强一致性的,它是最终一致性。
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}