文章目录
- 1. ConcurrentHashMap 的底层结构
- 2. ConcurrentHashMap 的元素存储过程
- 3. ConcurrentHashMap 的扩容
- 3.1 扩容的过程
- 3.2 源码分析
1. ConcurrentHashMap 的底层结构
JDK 1.8 的 ConcurrentHashMap 底层数据结构与 HashMap 基本相同,二者在容量机制、Entry 的 hash 值计算及数组 index 下标定位等方面几乎完全一致,这部分读者如感兴趣可参考 Java 8 HashMap 详解
2. ConcurrentHashMap 的元素存储过程
ConcurrentHashMap#putVal()
方法源码如下,从中可以看到其处理有以下几个关键步骤:
- ConcurrentHashMap 不支持 null 键 null 值
- 调用
ConcurrentHashMap#spread()
方法重新计算当前 key 的 hash,核心是高低 16 位异或增大 hash 离散程度- 将 volatile 变量 table 赋值暂存为 tab,for 循环进行元素存储的处理
- 首先判断底层数组是否已经初始化,如果没有则调用
ConcurrentHashMap#initTable()
方法先初始化数组- 根据 key 的 hash 运算确定一个数组下标,调用
ConcurrentHashMap#tabAt()
方法取得该数组下标上的第一个元素,如果该元素为 null 则不存在冲突,直接调用ConcurrentHashMap#casTabAt()
方法 CAS 插入元素- 如果数组下标第一个元素的 hash 值为
MOVED(-1)
,说明这个元素为 ForwardingNode 节点,该节点的存在表明 ConcurrentHashMap 正在扩容中,则此次循环无法插入元素,调用ConcurrentHashMap#helpTransfer()
方法帮助扩容,直到扩容完成才能继续插入元素- 如果数组下标第一个元素与当前 key 存在 hash 冲突,此时使用 synchronized 锁住该元素,如果该元素为链表节点,则直接将新元素封装到节点中插入链表末尾即可;如果该元素为树节点的封装对象,则说明这个下标位置上的链表已经转化为红黑树,调用红黑树的插入方法即可。元素插入后,如果检查到当前数组下标上的链表节点总数已经达到 8 个,则需调用
ConcurrentHashMap#treeifyBin()
方法尝试将其转化为红黑树- 元素存储完毕, 调用
ConcurrentHashMap#addCount()
方法更新元素计数器。如果需要进行扩容检查,则校验当前元素总数是否大于容量控制阈值sizCtl
,是则进行扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
-
ConcurrentHashMap#initTable()
方法逻辑比较简单,重要逻辑如下:- 如果底层数组 table 尚未初始化,则进入 while 循环
- 根据容量控制阈值
sizeCtl
确定当前是否有其他线程在进行初始化操作,如果该值为 -1 表示其他线程正在初始化中,则当前线程让出 CPU 资源,以便初始化操作尽快完成 - 如果调用
Unsafe#compareAndSwapInt()
更新sizeCtl
为 -1 成功,则当前线程执行底层数组的创建,并将sizeCtl
重置为底层数组长度的 0.75 倍,作为扩容阈值来使用
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
-
ConcurrentHashMap#treeifyBin()
方法逻辑比较简练,首先判断底层数组的长度是否小于 64,如是则调用ConcurrentHashMap#tryPresize()
方法尝试 2 倍扩容;其次如果当前数组下标的第一个元素仍然是链表节点,则锁住该节点,将当前链表转化为树节点链表,最终在 TreeBin 构造方法中完成红黑树构造private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } } private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
-
ConcurrentHashMap#addCount()
方法首先通过 Unsafe 更新元素计数器,其次如果入参指定了需要检查扩容则判断元素总数是否大于sizeCtl
容量阈值,如果是则需要进入扩容操作。这里的关键逻辑如下:- 当扩容操作还未开始时,
sizeCtl
还存储着容量阈值,此时其为正数,由当前线程使用 Unsafe 将其更新为特定算法计算出来的负数后,再调用ConcurrentHashMap#transfer()
开始扩容操作 - 当
sizeCtl
为负数时,说明扩容操作已经开始了,此时由当前线程使用 Unsafe 将其加 1 作为扩容线程计数,然后调用ConcurrentHashMap#transfer()
传入扩容操作的缓存数组nextTable
进行后续扩容操作
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
- 当扩容操作还未开始时,
3. ConcurrentHashMap 的扩容
3.1 扩容的过程
从源码来看,ConcurrentHashMap 的扩容时机主要有以下两个:
- ConcurrentHashMap 的元素总数大于容量控制阈值
sizeCtl
- 某个数组下标上的链表元素数量达到树化阈值 8,但是当前底层数组的长度小于 64
整个扩容从触发到处理的过程示意如下,关键步骤做如下补充:
- 当线程 A 添加元素到 ConcurrentHashMap 后,检查发现当前 Map 中的元素总数达到扩容阈值,则触发扩容操作
- 扩容时每条线程每次都只负责一部分下标的元素迁移,迁移进度由偏移量
transferIndex
控制,该值初始化为底层数组长度- 根据 CPU 核数和底层数组长度,分配每轮迁移数组下标的数量,最小为 16。确定该跨度值后,结合
transferIndex
就可以确定当前线程本轮负责迁移的数组下标范围,此时从数组尾部开始进行迁移工作- 线程 A 迁移过程中会在原数组每一个迁移完成的下标上留下一个 Forwarding 节点作为标记,当其他线程识别到标记节点时需要做对应处理。例如线程 B 添加元素时,发现数组下标上的元素是 Forwarding 节点,则知道当前正在扩容中,需要帮助扩容;当线程 B 获取元素时,在该下标上发现了 Forwarding 节点,则说明该下标上的元素都迁移到了新数组,这时候就要调用
ForwardingNode#find()
去新数组查找获取元素- 扩容的进程由
sizeCtl
控制,每多一个线程加入扩容,该值都会加 1。如果没有其他线程加入扩容,则单条线程会在在迁移方法不断将元素移动到新数组,直到transferIndex
为 0- 当
transferIndex
为 0 时,数组上已经没有元素要迁移了,则每条线程退出扩容时将sizeCtl
复位。当sizeCtl
值复位为原负数时,说明这条线程就是最后一条扩容线程,则执行扩容结束后的操作,从尾部开始重新检查原数组中是否有遗漏未迁移的元素,完成后使用迁移后的 nextTable 替换原数组,并将sizeCtl
值重新赋值为新数组的扩容阈值
3.2 源码分析
扩容的触发点比较多,但是无论从哪里触发,最终调用的都是 ConcurrentHashMap#transfer()
方法。需要注意的是,在每条线程加入扩容之前都会通过 U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)
借助 Unsafe 类在 sizeCtl
上计数,该变量在扩容时主要作用是确保由最后一条退出扩容的线程来完成迁移遗漏检查和数组替换。ConcurrentHashMap#transfer()
方法源码如下,可以看到处理大致划分为以下几个部分:
- 根据 CPU 核心数确定每轮迁移数组下标数量,最小为 16
- 如果新数组 nextTab 还没有创建,则按原数组长度 2 倍创建新数组,并初始化
transferIndex
偏移量为原数组长度- for 循环中进行迁移处理的代码大致分为以下几个部分:
- 首先在 while 循环中完成本轮迁移数组下标范围的确定,之后每迁移完一个数组下标则将右边界 i 左移,直到
transferIndex
偏移量为 0,将右边界 i 赋值为 -1- 根据右边界 i 的值计算当前线程本次扩容是否结束。在这部分中,首先通过
U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
复位sizeCtl
上计数。根据该变量的计算结果确定当前线程是否是最后一条退出扩容的线程,如果是则更新标识位finishing
,将右边界 i 赋值为原数组长度,从原数组尾部重新检查迁移是否有遗漏。当检查完毕,根据标识位finishing
使用新数组替换原数组完成扩容- 右边界 i 从右往左移动,如果遍历过程中原数组下标 i 上没有元素,则只需要在该位置插一个 Forwarding 节点作为标记即可。如果该位置上的元素已经是 Forwarding 节点,则将
advance
置为 true,跳过处理即可。如果在数组下标 i 上存在需要迁移的元素,则先用 synchronized 锁住其头节点,按照数据结构的不同做对应的迁移处理即可,迁移完成后在原数组下标位置留一个 Forwarding 节点作为标记。此处确定元素在新数组下标的算法可以参考 HashMap 扩容机制
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}