新人写手,代码菜鸡;笔下生涩,诚惶诚恐。
初试锋芒,尚显青涩;望君指点,愿受教诲。
本篇文章将从源码的层面,探讨ConcurrentHashMap的存储流程以及扩容原理
Java版本为JDK17,源代码可能与其他版本略有不同
推荐阅读:HashMap实现原理、扩容机制
一、构造函数
1.1 无参构造函数
ConcurrentHashMap的无参构造函数是一个空方法
public ConcurrentHashMap() {
}
1.2 指定容量、负载因子
多了一个并发级别concurrencyLevel,没看出来大用途emm......
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 找到大于等于 容量 / 负载因子 + 1 的2的幂
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
1.3 传入Map
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
二、关键成员变量
扩容相关的变量都用到了volatile关键字作修饰,保证变量并发可见性
// 默认大小,树化阈值等与HashMap相同
// ...
// 存储元素的Node数组
transient volatile Node<K,V>[] table;
// 协助扩容会用到
private transient volatile Node<K,V>[] nextTable;
// 大于0时,为扩容阈值,等于0时,为初始状态,小于0时,表示Map处于扩容中等一些特殊状态
// 扩容时,该值的低十六位用于表示共同扩容线程数
private transient volatile int sizeCtl;
// 扩容下标指针,用于记录数组扩容到了哪一个元素,从数组末尾开始
private transient volatile int transferIndex;
// 使用Unsafe高效读取和修改变量值
private static final Unsafe U = Unsafe.getUnsafe();
private static final long SIZECTL
= U.objectFieldOffset(test.ConcurrentHashMap.class, "sizeCtl");
private static final long TRANSFERINDEX
= U.objectFieldOffset(test.ConcurrentHashMap.class, "transferIndex");
// 扩容标志位数
private static final int RESIZE_STAMP_BITS = 16;
// 最大允许扩容线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 扩容标志位左移位数,扩容时用于将sizeCtl置为负数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 表示节点的状态,一般扩容时会将正在扩容的节点hash值修改为以下值
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
// 系统可用线程数,用于计算扩容步长
static final int NCPU = Runtime.getRuntime().availableProcessors();
三、存储流程
3.1 put流程
put方法会调用putVal方法,业务逻辑都在putVal中
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value都不能为null
if (key == null || value == null) throw new NullPointerException();
// key 的 hash 值
int hash = spread(key.hashCode());
int binCount = 0;
// 并发场景基本都用到了for循环,执行插入时,若map正在扩容导致插入失败
// 待扩容完成后可在此尝试插入
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// table为null或长度为0,执行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果当前table数组i位置没有元素,直接以cas方式插入当前待插入节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
// 如果当前当前节点元素hash值为-1,表示正在扩容(扩容时,扩容节点hash值会置为-1)
else if ((fh = f.hash) == MOVED)
// 则辅助进行扩容,具体后面说
tab = helpTransfer(tab, f);
// 走到这里,说明table[i]位置节点存在,并且数据不在扩容
// 和HashMap类似,onlyIfAbsent为true时,
// 仅当节点不存在时,才会插入;
// 如果存在,不会更新节点值
// 同样的,判断节点相同,需要hash值和key值都一致
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
// 锁住当前节点
synchronized (f) {
// 再次判断当前节点是否发生了变化,防止被其他线程提前修改
if (tabAt(tab, i) == f) {
// fh < 0时,代表三个特殊状态
// static final int MOVED = -1; 当前节点正在扩容
// static final int TREEBIN = -2; 当前节点是一颗树
// static final int RESERVED = -3; 保留当前位置
// 这里fh >= 0表示当前节点f为单节点(不是树或链表),或者为链表
if (fh >= 0) {
// 链表长度
binCount = 1;
// 开始寻找待插入节点在链表中的位置
// 找不到则直接挂载到链表尾部
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到了,直接更新数据(当然onlyIfAbsent为false)
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果链表最后都没有找到,新建一个Node挂到后面
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
// 如果为树,则遍历树,更新或插入节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
// 防止递归更新产生死锁
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
// 如果链表长度大于等于8,尝试将链表转化为树
// 只有table容量大于等于64才会尝试转化,否则优先扩容
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数+1,并检查是否需要扩容(是否达到扩容阈值)
addCount(1L, binCount);
return null;
}
- 由于需要保证线程安全,所以并不是每一个插入操作都是一次成功(如插入时Map正在扩容),所以用到了for循环;
- 插入元素时,用到了Synchronized锁,锁粒度具体到Node[]数组中的某个Node节点;
- 加锁后又判断了一次if (tabAt(tab, i) == f),类似双重校验锁,ConcurrentHashMap大量使用到了这种处理方式。
3.2 get流程
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 获取hash值
int h = spread(key.hashCode());
// 确保能查到元素,否则直接返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// hash和key都相同,返回值
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 节点hash值小于0,表示可能处于扩容状态
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表,继续查找key相同的节点,返回值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
// 类似,遍历链表查找key
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
四、扩容原理
在说传参为Map的构造函数的时候,见到过这样的方法
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
接下来就从tryPresize方法入手
4.1 tryPresize 尝试扩容
private final void tryPresize(int size) {
// 如果size >= max / 2,扩容后容量就用max
// 否则使用1.5 * size + 1向上扩为2的幂次方
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// sizeCtl >= 0表示不处于扩容等异常状态
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// putAll方法可能会执行此分支,table为空
if (tab == null || (n = tab.length) == 0) {
// 初始化长度sc够就用sc,否则用前面算出来的c
n = (sc > c) ? sc : c;
// cas把当前map状态置为扩容中,确保只有一个线程进入
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
// 确保table引用没被修改
// 创建新数组
// 将sizeCtl赋值为容量的0.75倍
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 数组容量已经大于所需要的容量,或者容量达到最大值
// 无需扩容,直接退出循环
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 获取扩容标志
// 假设n = 16
// 此时rs为1000 0000 0001 1011
int rs = resizeStamp(n);
// 扩容标志会使得低到高第16位为1,左移16位,保证sc为负数
// 此时sc修改为1000 0000 0001 1011 0000 0000 0000 0010
// 即-2145714174
if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
- if (U.compareAndSetInt(this, SIZECTL, sc, -1))保证只有一个线程执行new Node[]操作
- 扩容标志,用在协助扩容的时候,识别两个线程处于同一个扩容任务中
4.2 transfer 扩容方法
计算步长与初始化
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 每个线程迁移数据的步长,数组长度 / 8 / cpu可用线程数
// 最小为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化走这
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 新建数组,容量为之前的两倍,并赋值
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 赋值给volatile变量
// private transient volatile Node<K,V>[] nextTable;
nextTable = nextTab;
// private transient volatile int transferIndex;
transferIndex = n;
}
// ......
}
线程接受扩容任务
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// ......
// 新数组长度
int nextn = nextTab.length;
// 扩容节点,挂载的是新数组
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 任务标识符,为true时表示需要接受扩容任务
boolean advance = true;
// 结束标识符,为true时表示扩容结束
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 需要接受任务时
while (advance) {
int nextIndex, nextBound;
// 第三个分支i被赋值后或扩容结束走此分支
// 表示当前线程已接受任务
if (--i >= bound || finishing)
advance = false;
// 数组所有节点扩容任务均分配完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 修改扩容下标,假设需要扩容的数组长度为32,步长16
// 则每次循环transferIndex会减16,直到减为0
// nextBound记录当前循环到的transferIndex下标
// i赋值为nextIndex-1,(bound - i)即为当前线程需要数据迁移的数组下标
// 接受过任务,将advance置为false,不再进入下一循环
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
}
}
bound -> i即为当前线程需要进行数据迁移的数组下标。循环结束时,若此时没有其他线程协助扩容,当前线程会再次接受新的一批迁移任务
扩容后处理
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// ......
// ......
for (int i = 0, bound = 0;;) {
// ......
// 当前线程没有分配到任务时会满足i < 0条件
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容结束,nextTable置为null
// nextTab赋值给table
// 阈值置为新数组容量的0.75倍
// 第一次进入为false,之后finishing被赋值为true,执行finishing逻辑
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// cas使sizeCtl = sizeCtl - 1
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 第一个线程进入此,不会走下面的if
// 进入transfer方法时,满足(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
// 第二次进入,sc = sizeCtl比原来的值少1,可以进入if条件,执行return;
// 这里的判断,是为了检查所有辅助扩容线程是否均已扩容完毕
// 每个协助扩容线程都会将sc + 1
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 扩容结束
finishing = advance = true;
// i重新赋值为数组长度,让最后一个线程检查数组是否迁移完毕,并赋值sc为新的扩容阈值
i = n; // recheck before commit
}
}
// ......
}
}
数据迁移
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// ......
// ......
for (int i = 0, bound = 0;;) {
// ......
// ......
// 走到之后的分支意味着线程接收到了任务
// 如果数组i位置为null,不需要迁移,并将数组i位置hash置为MOVED,代表已经迁移过
else if ((f = tabAt(tab, i)) == null)
/*
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null);
this.nextTable = tab;
}
fwd hash为-1,key、value均为null
*/
advance = casTabAt(tab, i, null, fwd);
// hash为-1,说明该节点已经迁移过
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 进入节点迁移逻辑
else {
// 上锁,锁的是node数组f节点
synchronized (f) {
// 校验引用
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 单节点或链表部分
if (fh >= 0) {
// 和HashMap类似,将节点hash与n进行按位与分为两组
// 拆成低位链表和高位链表
// 低位链表放在新数组i位置,高位链表则为n + i位置
int runBit = fh & n;
Node<K,V> lastRun = f;
// 这里的循环是为了找到链表最后面的节点族的第一个节点
// 即保证lastRun节点及其之后的节点同属于高/低位链表
// 如此设计,最后遍历链表遍历到lastRun即可,因为lastRun之后的节点都和lastRun同属于一批
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 先将最后的节点族赋值
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 再次遍历f节点链表,开始构建高/低位链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 除了lastRun及其之后的链表是尾插法,其余遍历的链表采用头插法
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 新数组赋值,并将旧书组i位置置为fwd,标记已经迁移
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 树节点同样分为高低位数
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 拆分成两棵树后,判断长度是否小于6,决定是否需要红黑数链表化
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 递归更新抛出异常
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
}
}
}
这里的链表迁移与HashMap略有不同:
HashMap的高低位链表,相对顺序与原顺序相同;
而ConcurrentHashMap,lastRun及其之后的节点顺序保持一致,在链表尾部。其他的节点会以头插法的方式加入到新数组中,如下图,橙色1 4 5为低位,2 3 6 7为高位
4.3 helpTransfer 协助扩容
putVal方法中,当前节点hash为-1时,线程会协助进行扩容
// 如果当前当前节点元素hash值为-1,表示正在扩容(扩容时,扩容节点hash值会置为-1)
else if ((fh = f.hash) == MOVED)
// 则辅助进行扩容,具体后面说
tab = helpTransfer(tab, f);
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 旧数组不为空,并且当前f节点为ForwardingNode
// 并且f.nextTable也不为空,transfer方法中,nextTable为新数组
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 扩容戳
int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;
// 新老数组都是同一个,并且sizeCtl处于扩容状态,协助进行扩容
// 不相同可能是扩容完成了,或者发生了新的扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 扩容时,sizeCtl低16位代表扩容的线程数,首次进入时被设置为了2
// 当此时同时扩容数达到上限,或者开始执行最终扩容检查时(最终检查时,sizeCtl == rs + 1)
// 或者扩容下标为0(任务全部分配)时,退出
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
transferIndex <= 0)
break;
// sizeCtl + 1,表示此时扩容线程+1
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
// 协助扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
transfer方法中,将nextTab赋值给了volatile变量nextTable,旧数组长度n赋值给了transferIndex,此处用来判断是否是同一个扩容任务以及快速判断扩容是否完成。
结语:相对于HashMap,ConcurrentHashMap源码稍微绕一点。本文起到辅助理解作用,其他方法(remove等)原理类似,感兴趣可自行阅读源码。