ConcurrentHashMap 是我们日常开发中使用频率最高的并发容器之一了,具有如下特点:
基于JDK8分析
- 存储结构和
HashMap
一样,都是数组
+链表
+红黑树
- 是
线程安全
的容器,底层是通过CAS
自旋 +sychronized
来保证的 - key 和 value 都不允许为空,否则将抛出空指针异常
- 对比
HashTalbe
,锁的粒度控制在数组的桶元素上 - 可以多线程协助扩容
接下来从源码的角度来分析下 ConcurrentHashMap。
1. 关键属性
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/**
* 数组的最大长度 (和HashMap一样)
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 数组的默认长度 (和HashMap一样)
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 数组中的元素个数最大值,超过这个阈值将抛出OOM
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* The default concurrency level for this table. Unused but defined for
* compatibility with previous versions of this class.
*
* 翻译过来是默认的并发级别,但是并没有使用
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 加载因子
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 当链表长度达到8,转成红黑树
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 当红黑树的长度小于6,转成链表
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 当链表长度大于8,但是数组table长度没有达到64,此时不会转成红黑树,而是扩容
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 最小的迁移步长,当多线程并发扩容时,每个线程负责的迁移的最小桶的个数
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
* 表示并发扩容最大线程数
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* The bit shift for recording size stamp in sizeCtl.
* 和扩容相关,用来标识sizeCtl的符号位
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
//旧数组迁移后,桶位会放入ForwardingNode节点,hash值固定为-1
static final int MOVED = -1; // hash for forwarding nodes
//红黑树在桶中元素的节点(TreeBin)的hash值固定为-2
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
//保证hash值都是正数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/*
* 处理器的核数
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* table
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; //value是volatile的
volatile Node<K,V> next; //next是volatile的
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//.......
}
/* ---------------- Fields -------------- */
/**
* table数组,长度总是2的N次幂
*/
transient volatile Node<K,V>[] table;
/**
* 扩容时使用的
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 记录元素个数的基准值
*/
private transient volatile long baseCount;
/**
* table数组初始化和扩容的控制符,不同值有不同的含义:
* 1.如果值为-1,则表示table数组正在初始化;-(n + 1) 表示表示此时有n个线程正在共同完成数组的扩容操作
* 2.如果值为0,表示table数组还没有被初始化,默认长度是16
* 3.值为正数,如果数组没有初始化,则记录的是数组的初始容量;如果数组已经初始化,记录的是数组的扩容阈值(capacity * loadFactor)
*
* 这个值是非常重要的
*/
private transient volatile int sizeCtl;
/**
* 迁移时的索引
*/
private transient volatile int transferIndex;
/**
* 自旋锁标志位
* 在在调整CounterCell[]大小和或创建CounterCell[]时使用。
*/
private transient volatile int cellsBusy;
/**
* 当put元素并进行计数时,优先对baseCount进行加1操作,但是并发度高的情况下,只有一个线程能
* 成功,其他失败的线程将会使用CounterCell进行统计。
* 这样元素的总个数 = baseCount + CounterCell[]数组各个下标对应值的总和
*/
private transient volatile CounterCell[] counterCells;
}
复制代码
sizeCtl 属性着重说明一下:
- 如果值为 -1,则表示table数组正在
初始化
;-(n + 1) 表示表示此时有多个线程正在共同完成数组的扩容操作 - 如果值为0,表示table数组还没有被初始化,默认长度是16
- 值为正数,如果数组没有初始化,则记录的是数组的初始容量;如果数组已经初始化(table != null),记录的是数组的扩容阈值(capacity * loadFactor)
2.构造器分析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/**
* 如果使用的空参数构造器,那么默认的长度16
*/
public ConcurrentHashMap() {
}
/**
* 指定初始容量
* 如果初始容量大于(最大值/2 = 2^29),那么初始容量就是默认的最大值2^30
* 否则去初始容量的(1.5倍+1),然后计算出大于该值的最小的2的N次幂
*
*
* 同时,将初始容量赋给sizeCtl, 表示数组还没有初始化
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//.....其他构造器......
}
复制代码
tableSizeFor()
方法并不陌生,前面讲HashMap的时候,我们有提到,不过二者在计算数组的初始容量稍有差异,HashMap
直接对指定容量进行位运算,计算出大于等于指定容量的最小的2的N次幂;而ConcurrentHashMap
是先对指定容量进行(1.5倍 + 1
), 然后对计算后的值进行位运算,得到大于该值的最小的2的N次幂。
HashMap
: 7 ——> 8,17 ——> 32,16 ——> 16,1 ——> 1ConcurrentHashMap
: 7 ——> 16,17 ——> 32,32 ——> 64,1 ——> 2
3.put()方法分析
测试数据:
public class Test {
public static void main(String[] args) {
//此时初始容量是64,由于内部的table还没有初始化,所以sizeCtl属性值为初始容量64
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(32);
map.put("abc", 12);
}
}
复制代码
源码分析:
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value 都不允许为空
if (key == null || value == null) throw new NullPointerException();
/**
* 通过扰动函数,计算出hash值,高16位也参与运算,这个和HashMap一样
* 不过计算出结果后,它会 &0x7fffffff,其目的是,这样计算出来的hash值都是正数!
*/
int hash = spread(key.hashCode());
int binCount = 0;
//死循环,搭配CAS做自旋用的
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果数组是空的,则需要初始化table数组 ①
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//数组不为空,计算当前key在数组中桶的下标,如果为空,则表示没有放元素,则通过cas设置元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; //设置成功,break
}
//如果桶中元素的hash值为-1,表示当前数组正在扩容
else if ((fh = f.hash) == MOVED)
//当前线程协助扩容
tab = helpTransfer(tab, f);
else { //否则就需要插入到链表或者红黑树中
V oldVal = null;
//对数组中的桶节点元素加锁
synchronized (f) {
//double check, 避免其他线程将树转成了红黑树,或者其他线程移除了该元素
if (tabAt(tab, i) == f) {
/**
* fh >= 0, 表示普通的链表节点
* 能走到这里,说明数组中的桶元素不为空,同时hash值也不是-1
* 同时,如果是红黑树,则桶中元素放到的是TreeBin节点,它的hash值固定是-2,
* 所以,如果 fh>=0,则是普通链表
*/
if (fh >= 0) {
binCount = 1;
//整个for循环是处理链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//插入到红黑树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度大于等于8,则将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//元素个数+1
addCount(1L, binCount);
return null;
}
复制代码
接下来我们研究一下功能点的细节方法
3.1 表的初始化——initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//while循环+CAS形成自旋
while ((tab = table) == null || tab.length == 0) {
/*
* 1.如果值为-1,则表示table数组正在初始化;-(n + 1) 表示表示此时有n个线程正在共同完成数组的扩容操作
* 2.如果值为0,表示table数组还没有被初始化,默认长度是16
* 3.值为正数,如果数组没有初始化,则记录的是数组的初始容量;如果数组已经初始化,记录的是数组的扩容阈值(capacity * loadFactor)
*
*/
//如果值小于0,说明正在初始化或者扩容,此时当前的线程就直接放弃CPU的使用权
//由运行状态变为就绪状态,再次获得CPU的控制权后接着往下执行,这里也就是继续执行while循环
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
/**
* 在测试数据中,我们指定了初始容量,在没有初始化的情况下,sizeCtl等于初始容量也就是64 > 0
* 利用CAS将sizeCtl修改为-1
*
* U.compareAndSwapInt(this, SIZECTL, sc, -1)
* 如果SIZECTL(就是sizeCtl在内存中的偏移量)和sc相等,则将SIZECTL(就是sizeCtl)修改为-1
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
/*
* 为什么要double check ?
* 因为finally 中 sizeCtl = sc; 此时 sizeCtl是扩容阈值,其他线程
* 来到else if 判断为true, 则也可以进来,这样就重复初始化了
*/
if ((tab = table) == null || tab.length == 0) {
//这里的sc还是初始容量,请看指定容量的构造器方法
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
/**
* 重新计算sc: 因为table已经初始化了,所以这sc值在这里就是扩容阈值了
*
* n - (n >>> 2) = n - n/4 = 3/4(n) = 0.75 * n
* sc = 0.75 * table.length
*/
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
初始化完成之后,就不走其他判断了,直接重新循环,此时table已经不为空了,然后根据key计算在数组中的索引,判断是否为空,如果为空,则通过CAS插入元素。
3.2 CAS插入元素
这里比较简单,如果数组中该索引位置没有元素,则直接通过CAS插入即可;如果插入失败,则说明有其他线程先一步插入了该位置,则CAS失败,此时就重新for循环然后插入到链表位置。
接下来我们先看下如果出现hash冲突如何解决,协助扩容晚点再探究。
3.3 hash冲突
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value 都不允许为空
if (key == null || value == null) throw new NullPointerException();
/**
* 通过扰动函数,计算出hash值,高16位也参与运算,这个和HashMap一样
* 不过计算出结果后,它会 &0x7fffffff,其目的是,这样计算出来的hash值都是正数!
*/
int hash = spread(key.hashCode());
int binCount = 0;
//死循环,搭配CAS做自旋用的
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//....省略其他else if........
//hash碰撞的处理逻辑
else {
V oldVal = null;
//对数组中的桶节点元素加锁
synchronized (f) {
//double check, 避免其他线程将树转成了红黑树,或者其他线程移除了该元素
if (tabAt(tab, i) == f) {
/**
* fh >= 0, 表示普通的链表节点
* 能走到这里,说明数组中的桶元素不为空,同时hash值也不是-1
* 同时,如果是红黑树,则桶中元素放到的是TreeBin节点,它的hash值固定是-2,
* 所以,如果 fh>=0,则是普通链表
*/
if (fh >= 0) {
binCount = 1;
//处理链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//插入到红黑树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度大于等于8,则将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//元素个数+1
addCount(1L, binCount);
return null;
}
复制代码
不难发现,如果出现hash冲突,则锁住当前桶位,对比HashTable
, 锁的粒度直接从锁整个表下降到锁表中的某一个桶位,并发度将大大提高。如果是链表,则采用尾插法插入到链表尾部;如果是红黑树,则将元素插入到红黑树中;如果链表长度 >=8,则将链表转成红黑树。接下来我们简单看下链表转成红黑树的逻辑:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
/**
* 链表长度大于8,但是数组长度没有达到64,此时不会转成红黑树,而是先扩容
* 这一点和HashMap一样
*/
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1); //扩容2倍,扩容的逻辑放到后面再说
/**
* 1.找到桶中的元素,并且不为空
* 2.hash值要大于0,为什么?
* 因为桶中元素的hash值小于0,只有两种情况
* 1)数组正在扩容,旧数组迁移后的桶的位置会放一个 ForwardingNode 节点,它的hash值是-1
* 2)桶中的元素已经是树了,此时桶中元素的hash值是-2
*/
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
//锁住当前桶中的元素,开始树化
synchronized (b) {
/**
* double check
* 为什么要double check ?
* 因为有可能当前线程进来了else if, 但是还没有执行sychronized获得锁之前,有其他线程
* 已经获得了这个锁,然后开始树化,树化后节点元素可能随着左旋和右旋而改变,所以这里必须要double check
*/
if (tabAt(tab, index) == b) {
//整个for循环就做一件事,将当前的单向链表转成双向链表
//hd 是双向链表的头节点
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
/**
* 将当前桶的位置设置为 TreeBin 节点,然后将双向链表通过构造器传入到
* 内部,然后再内部将双向链表转成红黑树。
* 注意:桶中的TreeBin 的hash值是 -2
*/
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
复制代码
我们用一张图来展示下红黑树的存储结构:
3.4 统计元素个数
我们看下addCount()
方法的内部实现:
这个方法内部主要做两件事:一个是统计元素个数,另一个是扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
/**
* 1.统计元素个数
* 优先对baseCount进行加,但是高并发下,只能有一个线程通过CAS添加成功,其他的线程CAS会失败
* 此时那些失败的线程就会跳过对baseCount的运算,而是去对CounterCell[]数组中的元素进行计数
* 这样总的元素个数 = baseCount + CounterCell[]数组各个元素的和
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
/**
* 如果check >= 0, 则需要判断是否扩容,因为remove()的话,也会调用调用这个方法
* 传入的是 -1,此时是不需要扩容的。
*
*/
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
复制代码
我们先用一张图来整体概括下计数的逻辑:
我们来详细分析下统计元素个数的逻辑:
参考JUC下的原子类
LongAdder
,他们的逻辑是一模一样的。使用了分段CAS的思想
/**
* 1.统计元素个数
* 优先对baseCount进行加,但是高并发下,只能有一个线程通过CAS添加成功,其他的线程CAS会失败
* 此时那些失败的线程就会跳过对baseCount的运算,而是去对CounterCell[]数组中的元素进行计数
* 这样总的元素个数 = baseCount + CounterCell[]数组各个元素的和
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
/**
* 不难发现,一旦并发比较高,对baseCount加失败后,就会对counterCells计数,之后就都会
* 对 counterCells 计数,不在对baseCount计数了。
*
* 一旦CAS对baseCount计数成功,就退出了
* 一旦CAS对baseCount计数失败,取反就是true,开始初始化counterCells然后计数
*
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//对counterCells计数的核心逻辑
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//......先省略扩容的逻辑........
}
复制代码
接下来我们看下对 counterCells
计数的逻辑:
核心逻辑主要就是集中在这3个判断中:
- 第一个判断主要是对
counterCells
数组进行扩容,每次也是以2的N次幂进行扩容,初始容量是2 - 第二个判断主要是初始化
counterCells
数组 - 第三个判断是如果对
counterCells
计数失败了,回过头来在对baseCount
计数,吃回头草。
接下来我们在详细展开看看:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
/**
* h: 可以理解为一个随机数,用来计数counterCells数组索引位置的
* h = ThreadLocalRandom.getProbe()) == 0 说明还没有初始化,调用localInit()初始化,产生一个随机数
*/
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
//碰撞的意思
boolean collide = false; // True if last slot nonempty(如果最后一个槽位非空则为True)
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
/**
* 1.这里已经完成了对counterCells的初始化操作。我们先看步骤2判断
*
*/
if ((as = counterCells) != null && (n = as.length) > 0) {
/**
* 1.1计算counterCells数组中元素的位置是否为空
*
* 无论h是什么,假如n就是初始容量2,h&1位运算结果只能是0或1(因为h只有最后一个位参与运算)
* (a = as[(n - 1) & h]) == null 说明counterCells数组的位置没有值
*
*/
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy == 0 说明counterCells没有初始化也没有扩容
if (cellsBusy == 0) {
//创建一个 CounterCell 对象,将保存到 counterCells数组中
CounterCell r = new CounterCell(x); // Optimistic create
/**
* 这里为什么还要对cellsBusy CAS 判断呢?
* 说白了,来到这里也有可能是多个线程,所以必须确保只能有一个操作成功
*/
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
//依然double check, 因为有可能执行完finally逻辑后,另一个线程就进来了
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//Uncontended 是无竞争的意思
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
/**
* 尝试用CAS对该单元格的值累加,如果成功则退出
*/
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
/**
* 多次尝试操作后,依然失败了,说明对couterCells并发非常高,则此时尝试扩容
* 扩容为原来的2倍(也是2的N次幂)
*/
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//产生一个新的随机数
h = ThreadLocalRandom.advanceProbe(h);
}
/**
* 2.初始化counterCells数组,先看这里。
* cellsBusy == 0 表示不忙,也就是还没有初始化,然后通过CAS将其修改为1,表示忙
* 在搭配double check 这样其他线程就无法再进来重复初始化了
*
*/
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
// double check
if (counterCells == as) {
//创建一个长度为2的数组
CounterCell[] rs = new CounterCell[2];
/**
* 计算索引位置,h是一个随机数
* h & 1 的结果要么是0,要么是1,因为无论h是什么,它只有最后1位参与位运算
* 索引,这个x值可能放在index = 0的位置,或者放到index = 1的位置
*/
rs[h & 1] = new CounterCell(x);
//赋值,完成初始化操作
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果对counterCells计数失败,回过头来在对baseCount计数
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
复制代码
整个流程我们用下面这个图概括下:
如何计算元素个数呢?
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
//先取出baseCount的值
long sum = baseCount;
//再取出counterCells数组中每个单元格的值
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
复制代码
好了,关于统计元素个数就梳理到这里,接下来我们看下是否扩容的逻辑。
3.5 扩容
private final void addCount(long x, int check) {
//......省略统计元素个数的逻辑........
//如果是remove元素,则不要走扩容逻辑了,remove传过来的是-1
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
/**
* s = sumCount() 就是元素个数
* 因为table已经完成了初始化(table != null),所以sizeCtl此时表示扩容阈值
* s >= (long)(sc = sizeCtl): 判断集合元素个数是否大于扩容阈值,超过则扩容
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
/**
* 内部实现:Integer.numberOfLeadingZeros(n) | (1 << (16 - 1));
* 先不看前面,二者按位或之后第17位肯定是1,其他先不管
*/
int rs = resizeStamp(n);
/**
* sc < 0 是什么情况?很明显就是扩容了
*
* 假设a,b两个线程同时来到while这里,a线程优先执行了else if, 此时将会把
* sizeCtl修改为负数,此时b线程判断 sc < 0 就成立了。
*
* 假如a线程执行完CAS判断后,还没有执行transfer(tab, null)方法,此时切换
* 到b线程,由于nextTable == null, 所以b线程直接退出了。这怎么理解 ?
* 1)b线程已经将元素添加到old table中了
* 2)元素个数已经统计完毕了
* 所以,直接让b线程退出,由a线程完成迁移工作就可以了。。。。。
*
* 假如a线程执行完CAS判断后,开始执行transfer(tab, null)方法,nextTable != null
* 此时切换到b线程,b线程执行CAS判断后,执行transfer(tab, nt) 方法,这里怎么理解?
* --这里就是协助扩容了。
*
*
*/
if (sc < 0) {
/**
* 判断是否已经扩容结束了,nextTable就是扩容后的新数组
* 扩容完成后,会将nextTable 赋给 table
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//协助扩容
transfer(tab, nt);
}
/*
* 将rs左移16位,使sizeCtl变成负数
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//当前线程是触发扩容的第一个线程
transfer(tab, null);
s = sumCount();
}
}
}
复制代码
我们先来看下rs
变量的含义:
static final int resizeStamp(int n) {
//RESIZE_STAMP_BITS = 16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
复制代码
首先将1左移15位:
00000000 00000000 00000000 00000001
00000000 00000000 10000000 00000000 #左移15位,前面15位去掉,后面补15位
复制代码
不管
Integer.numberOfLeadingZeros(n)
的值是什么,他们按位或的结果,其中第17位一定是1.
我们继续看 else if 中的CAS的逻辑:
//RESIZE_STAMP_SHIFT = 16
else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
复制代码
其中要对rs的值左移16位:
00000000 00000000 10000000 00000000 #rs
10000000 11100111 00111111 11111000 # rs << 16
复制代码
rs 左移16位,会将第17位的1移动到首位,这意味着什么?意味着rs 是一个小于0的数值。只看首尾就好,其他位是什么不重要。
接下来我们就看下扩容的核心逻辑(非常复杂):
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
/**
* 这个方法是可以完成多个线程的共同扩容的
* 这里的含义就是对旧数组分块,每个线程负责多少块(步长)的数据迁移。
* 默认是每个线程负责16个桶位的迁移
* 当然了,如果是单核,那就完全没有必要使用多线程扩容了,数组长度是多少,线程就负责多少个元素(步长)
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//创建新的数组,长度为原来的2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//方便协助扩容的线程拿到新表
nextTable = nextTab;
//transferIndex 就是旧数组的长度
transferIndex = n;
}
int nextn = nextTab.length;
/**
* 创建一个 fwd 节点,它的hash值固定为 -1
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//迁移推进标识
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
/**
* 这个while的逻辑:
* 1)给每个线程分配迁移的任务(默认16个长度)
* 2) 从后往前一个一个迁移
*/
while (advance) {
int nextIndex, nextBound;
/**
* boud 是一个任务段的边界值,请看下面的图
* 没有达到边界值之前,一个一个元素的迁移
*/
if (--i >= bound || finishing)
advance = false;
//迁移到最后一个元素了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
/**
* 这里的逻辑就是为每个线程分配一个任务段(默认就是16个桶位的长度)
*
*/
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
* 这个if的判断就是:检查是否所有线程都迁移完成
*
*
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/**
* 所有线程都迁移完成
*/
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**
* 再次review旧数组每个节点,确保没有遗漏的
* 来到这里,i = oldTable.legnth, bound = 0
*/
i = n; // recheck before commit
}
}
//从这接下来的判断就是真正的迁移了
//如果旧数组位置是null,那干脆不用迁移了,直接放一个fwd节点,告诉其他线程正在扩容,
//不要再往旧数组放了,你可以过来协助扩容
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//说明已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//迁移真正有数据的桶位
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//处理链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//处理红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
整个迁移的逻辑非常的复杂,并且代码很长,我们将其拆开看:
3.5.1 对迁移步长的理解
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//构建每个线程迁移的步长,默认每个就是16个
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
/**
* 穿件扩容后新的table, 长度为原来的2倍
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//赋给nextTable, 方便其他协助扩容的线程能否拿到新表
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
/**
* 构建一个fwd节点,hash值固定为-1
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
//从后往前遍历旧数组,每迁移一个就把旧数组的位置放一个fwd节点
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//省略for循环的其他逻辑........
}
}
复制代码
我们将重点关注代码截图展示:
- 首先线程进来先执行1处,它就是给每个线程分配迁移的任务段,默认就是16个,bound就是本次线程负责迁移的段尾
- 然后循环执行2处,从后往前一个一个迁移数据,迁移完成后,将fwd节点覆盖
- 表示整体迁移结束了
我们用一个图来表示下它的分段:
我们在用一个图来展示下
3.5.2 迁移数据
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//....省略其他代码......
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//1.省略分配任务的while循环
//2.省略迁移完成的判断
//接下来就是迁移工作了
//如果旧数组位置是null,则不用迁移,直接放一个fwd节点即可
//如果其他线程想要put元素,发现是fwd节点,则知道在扩容,此时将会参与扩容,将数据放到新数组中
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//说明已经迁移过了,当每个线程迁移完成后,它还会重新检查一遍,确保没有遗漏
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//这里就是将有数据的节点真正的迁移了
else {
/**
* 这里要加锁,避免A线程迁移的过程中,B线程往旧的数组里put元素
* put先抢到锁,就先添加,迁移线程先抢到锁,则先迁移
*/
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//处理链表
if (fh >= 0) {
//fh & n: 如果是0,说明是放到原位置,这个和HashMap一样的
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//放到新数组的【原位置】
setTabAt(nextTab, i, ln);
//放到新数组的【原位置 + oldCap】位置
setTabAt(nextTab, i + n, hn);
//旧数组放一个fwd节点
setTabAt(tab, i, fwd);
//再次向前推进
advance = true;
}
//处理红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
这里其实相对简单,我们简单画个图来展示下:
3.5.3 迁移完成
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//....省略其他代码.......
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
/**
* 迁移完成nextIndex = -1
*/
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//....省略else if....
}
/**
* 迁移完成的判断
* 迁移到最后一个元素,i = -1
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//所有线程迁移完成了,将nextTable 置为null
nextTable = null;
//将迁移后的新数组赋给table,此时table就是新数组了
table = nextTab;
/**
* 计算新的扩容阈值
* sizeCtl = 2n - 0.5n = 1.5n = 0.75 * 2n
* JDK的开发者将位运算用的真是神乎其神
*/
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
* 判断是否所有线程都完成了扩容
* sc - 1 是因为,协助扩容的时候sc + 1
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**
* 将数据长度赋给i,重新检查旧数组的所有位置,确保都迁移完毕,没有遗漏
*/
i = n;
}
}
//......省略其他代码......
}
复制代码
我们重点看下CAS的判断逻辑:
外层判断(1) 为什么要对sizeCtl减1的操作呢?
这个主要是和协助扩容有关。
我们看下协助扩容的代码:
我的初始容量是64,计算出来的rs值是固定的32793,然后触发扩容的第一个线程A会将sizeCtl
修改为负数 -2145845246
, 然后参与扩容的线程B会将sizeCtl
修改为 -2145845246 + 1
,参与扩容的线程C将sizeCtl修改为-2145845246 + 1 + 1
, 接下来我们回到扩容完成的逻辑:
所以这个内层判断和外层判断的逻辑就是检查是否所有线程都完成了扩容操作。
如果从始至终都只有一个线程参与扩容这个判断也是成立的,因为sizeCtl的值先赋给了sc,然后通过CAS将sc - 1 赋给sizeCtl, 但是sc的值没变,所以内层判断肯定是成立的。
我们在简单看下,put元素时出现hash冲突
,并且hash=-1
时的协助扩容:
简单看下代码:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//double check
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
//sc = sizeCtl) < 0 :小于0表示扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
/**
* 判断是否扩容完毕了。。
* (sc >>> RESIZE_STAMP_SHIFT) != rs 判断我看在JDK17中拿掉了
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
复制代码
3.6 put元素过程总结
-
计算扰动计算出hash值,这个hash值永远是正数
-
如果没有初始化,就调用 initTable() 方法来进行初始化
-
如果没有 hash 冲突就直接 CAS 无锁插入
-
如果正在扩容,则当前线程协助扩容
-
如果存在 hash 冲突,就加锁来保证线程安全,两种情况:一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入
-
如果该链表的数量大于阈值8并且数组长度大于64,就要先转换成红黑树的结构
-
如果添加成功就调用 addCount() 方法统计 size,并且检查是否需要扩容
4.get()方法分析
理解了put方法,再来看get就非常简单了
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//从数组的桶中查找到
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
/**
* 从红黑树中查找
* hash < 0 只有是扩容或者红黑树在桶中的元素节点
*/
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//从链表中查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
复制代码
5.debug 分析多线程扩容
准备测试数据:
/**
* @author qiuguan
* @date 2022/11/24 23:40:24 星期四
*/
public class ConcurrentHashMapTest {
public static void main(String[] args) {
/**
* 指定容量32,则table的初始容量是64
* 当达到 64 * 0.75 = 48 时触发扩容
*/
ConcurrentHashMap<String, Integer> chm = new ConcurrentHashMap<>(32);
//先放47个元素
for (int i = 1; i <= 47; i++) {
chm.put("jk" + i, i);
}
//Ea 的 hash值是: 2236, 在数组中的索引位置是: 60
//FB 的 hash值是: 2236, 在数组中的索引位置是: 60
int ea = "Ea".hashCode();
int fb = "FB".hashCode();
System.out.printf("Ea 的 hash值是: %s, 在数组中的索引位置是: %s\n", spread(ea), (64 -1) & spread(ea));
System.out.printf("FB 的 hash值是: %s, 在数组中的索引位置是: %s\n", spread(fb), (64 -1) & spread(fb));
//再次添加元素会触发扩容
new Thread(() -> chm.put("Ea", 11)).start();
new Thread(() -> chm.put("FB", 22)).start();
}
//将源码中计算hash值的逻辑拿出来s
static final int spread(int h) {
return (h ^ (h >>> 16)) & 0x7fffffff;
}
}
复制代码
关键位置打上断点:
先让thread0
(Ea)线程执行:
然后让thread0
开始迁移,迁移几个就行,因为"Ea
"的在数组中位置是靠后的(index = 60),而迁移是从后往前迁移的,所以确保"Ea
"迁移完成后,我们切换到 "thread1"线程。
在thread1线程进行协助扩容前,我先用几张图来演示下过程:
-
thread0
("Ea")触发扩容 -
thread0
("Ea")开始扩容并完成"Ea"数据的迁移工作
thread1
("FB")put元素发生hash碰撞并且桶中元素的hash值为-1,准备协助扩容
接下来我们就从"thread1
"视角看它是如何协助扩容的,首先要切换到"thread1
"线程
用一张图展示的话,大概就是下面这样