文章目录
- 1. 引言
- 2. 使用
- 3. 初始化
- 4. 存储流程
- 5. 取值流程
- 6. 扩容流程
1. 引言
ConcurrentHashMap
是一个线程安全的HashMap,在JDK1.7与JDK1.8,无论是实现还是数据结构都会有所不一样。这促使了ConcurrentHashMap
有着HashMap
一样的面试高频考点。
接下来,我将会以下面几点带硬核大家从源码角度理解ConcurrentHashMap
的整体流程,开始发车!
注意:若文章无特殊说明均代表JDK1.8的
ConcurrentHashMap
2. 使用
在进入源码学习之前,先回忆一下ConcurrentHashMap
是如何使用的。
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<>();
map.put("a", "b");
map.put("b", "c");
map.put("c", "d");
System.out.println(map.get("a"));
}
ConcurrentHashMap
简单使用如上,不过多赘述。
3. 初始化
想学学习一个类的源码,就必须由浅入深,先从构造方法开始学习。
无参构成,没啥好聊的
public ConcurrentHashMap() {
}
有参构造,构造参数为初始化容量
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
// initialCapacity为0 抛异常
throw new IllegalArgumentException();
// 判断初始化容量参数initialCapacity 与 MAXIMUM_CAPACITY >>> 1 的大小
// 如果 initialCapacity 大于等于 MAXIMUM_CAPACITY >>> 1
// 则取 MAXIMUM_CAPACITY 为容量
// MAXIMUM_CAPACITY 是Map的最大容量
// 如果 initialCapacity 小于 MAXIMUM_CAPACITY >>> 1
// 找出距离initialCapacity最近的2次幂
// 为什么要2次幂????别急 后面会聊到。
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 根据传递进来的参数,找出这个参数最近的2次幂
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
有参构造,构造参数为一个Map
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
// 容量大小初始化为默认的容量16
this.sizeCtl = DEFAULT_CAPACITY;
// 将Map的元素全部put进去
putAll(m);
}
4. 存储流程
使用ConcurrentHashMap
将一个键值对放进Map的时候,我们通常调用put
方法
public V put(K key, V value) {
// 在put方法中,并没有做太多的事情,而是直接调用了putVal方法
// 对于putVal方法,有三个参数,key-value就没啥好说的,就是需要存储的key-value值
// 第三个参数传递一个boolean
// 如果为false,代表如果Key存在了,直接覆盖数据
// 如果为true,代表如果Key存在了,什么都不做
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许Key 或 Value 当中有一个为null
// 为啥呢?
// 是因为ConcurrentHashMap的应用场景是多线程场景下,如果Key或Value为null容易出现歧义
// 毕竟无法得知Key 或 Value为null,是因为本身存储的就是null还是因为其他线程修改导致出现的null
if (key == null || value == null) throw new NullPointerException();
// 计算哈希值,请看下面的spread方法
int hash = spread(key.hashCode());
int binCount = 0;
// tab指向table, table就是JDK1.8中ConcurrentHashMap的Node数组
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果tab为null 或 table长度为0 那么进行初始化table操作
if (tab == null || (n = tab.length) == 0)
// 请看下面的initTable方法解释
tab = initTable();
// tabAt方法的详解请看下面
// (n - 1) & hash 是计算hash对应的索引下标,判断table对应的这个索引下标是否有值
// 通过CAS获取table对应索引下标的值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果table数组在i索引下标位置没有值,利用CAS插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 能走到这个else-if 说明hash值计算出来的索引下标在table中存在值了
// f是一个Node数组的一个元素
// 取出这个Node元素的hash值,如果哈希值为MOVED,那么代表当前hash位置的数据正在扩容
// static final int MOVED = -1
else if ((fh = f.hash) == MOVED)
// 扩容机制后面再聊
tab = helpTransfer(tab, f);
else {
// 能走到这里,说明hash值计算出来的索引下标在table中存在值了,并且当前不处于扩容
// 就需要往链表里面插入数据了 往链表插入数据,需要锁当前Node数组下标i的数据块
V oldVal = null;
synchronized (f) {
// 校验一下table在i的下标的下标是不是等于f
// 这是一个双重校验,校验一下索引下标i的桶是否已经包含了期望的节点f
if (tabAt(tab, i) == f) {
// 能进来说明包含了,索引下标i的桶存储的就是期望的节点f
// tabAt(tab, i) == f 证明是正常情况,索引下标i的桶的对象没被其他线程修改更换
// 前面fh = f.hash, 所以fh记录的是f的哈希值
// static final int MOVED = -1; 代表当前hash位置的数据正在扩容!
// static final int TREEBIN = -2; 代表当前hash位置下挂载的是一个红黑树
// static final int RESERVED = -3; 预留当前索引位置……
// 判断一手fh是不是大于0,也就是排除上面的三种情况
if (fh >= 0) {
// binCount是用来记录链表下面挂了几个
binCount = 1;
// 遍历下标i对应的桶下的链表,每遍历一次,binCount+1
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 看到这里或许有点忘记了,这个hash就是要存的键值对的Key计算出来的二次哈希值
// 判断一下数组下标i的hash与需要存的键值对的hash是否一样,表示判断是否是重复数据
if (e.hash == hash &&
// 判断一手要存的键值对的Key与数组下标i的Key是不是同一个
// 只要地址或内容有一个一样 说明就是同一个key
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 将老数据赋值给oldVal
oldVal = e.val;
// onlyIfAbsent就是put方法里面调用putVal方法里面的布尔值参数
// 如果为false 则新数据覆盖旧数据
// 如果为true 则不做任何处理
if (!onlyIfAbsent)
e.val = value;
break;
}
// 能走到这里,就代表了要存储的键值对,与当前遍历的Node节点记录的Key不是同一个
// pred记录当前的Node节点
Node<K,V> pred = e;
// e记录挂在e下的一个Node节点
// 判断一下e是不是为null 如果不为null 说明pred下面还有一个节点
// 那么继续走循环 继续判断是不是同一个Key 用不用覆盖数据
if ((e = e.next) == null) {
// 当走到最后一个Key都不是同一个的话,那么就创建一个Node节点挂上去
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 走到这个else-if说明fh >= 0不成立
// 那么判断一手当前下标i挂的是不是红黑树
else if (f instanceof TreeBin) {
// 如果是红黑树,就需要将数据插入进红黑树中
Node<K,V> p;
// 这个就有意思了,前面将数据插入链表的时候binCount初始化为1的
// 将数据插入红黑树的时候,binCount却初始化为2
// 这个暂时没想懂 后续懂了再补充
binCount = 2;
// 将Key-value放进红黑树中
// putTreeVal方法 如果返回null则代表添加
// 否则代表查找, 返回Key一样的节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 返回的p不为null 说明存在一样的Key
// 记录Key对应Value的旧值
oldVal = p.val;
if (!onlyIfAbsent)
// 覆盖数据
p.val = value;
}
}
}
}
// 到这里,就完成了数据的插入
// 到这一步,就是大家都熟悉的扩容或是链表转化为红黑树的操作了
if (binCount != 0) {
// binCount不为0,说明下标i对应的桶下的节点总数不等于0
if (binCount >= TREEIFY_THRESHOLD)
// 节点总数大于等于8, 可能进行扩容,也可能进行链表转化红黑树
// 这个方法后面再说
treeifyBin(tab, i);
if (oldVal != null)
// oldVal记录的是Key一样的情况下 旧的Value值
// 如果存在Key一样的情况下,那么就将旧的value值返回
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
二次哈希——spread
方法
// 方法入参参数为Key的哈希值
// 在这个方法中,首先Key的哈希值h先要自身哈希值的高16位进行^(异或操作,相同为0,不同为1)
// 为什么要进行^操作??
// 原因是在后面的(n-1)&hash的操作计算索引下标的时候
// 00000000 00000000 00000000 01010101
// 00000000 00000000 00000000 00011111
// 可以看见,由于n的数值较小,高16位根本不参与运算,于是设计HasMap的作者就想出了二次哈希
// 就是将低16位与高16位进行^操作,综合高位数据,让哈希值分布更加均匀,减少哈希冲突
// 那么为什么低16位^高16位的计算结果要和HASH_BITS进行&(与运算,只有都为1的时候才为1)?
// 首先HASH_BITS的取值为0x7fffffff,这个值就是int的最大值 也就是01111111111111111111111111111111
// 而Key的哈希值也为int,所以哈希值的最大值也是0x7fffffff
// (h ^ (h >>> 16))完成后可能会导致进位,也就是位数超出32位
// 因此需要和HASH_BITS进行与操作,将哈希值的取值范围控制在32位,也就是将高位屏蔽
// 这样就能在下次(n-1)&hash提高运行效率
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
初始化table方法——initTable
方法
private transient volatile int sizeCtl;
// sizeCtl: 表初始化和调整大小控件
// sizeCtl < 0: 表正在初始化或调整大小
// -1: 表示数组正在初始化
// < -1: 低16位代表当前数组正在扩容的线程个数(如果1个线程扩容,值为-2,如果2个线程扩容,值为-3)
// sizeCtl = 0: 代表数组还没初始化
// sizeCtl > 0: 代表当前数组扩容阈值, 或者是当前数组的初始化大小
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 初始化未完成时,一直进行while循环
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 小于0代表其他线程正在初始化,线程等待一下继续while循环
Thread.yield(); // lost initialization race; just spin
// 进行CAS修改
// compareAndSwapInt方法
// 参数var1:表示要操作的对象本身;
// 参数var2:表示要操作对象中内存地址的偏移量;
// 参数var3:表示需要修改数据的期望的值;
// 参数var4:表示需要修改为的新值;
// 线程安全,确保只有一个线程初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 更新成功之后,还需要判断一手
// 防止重复初始化table,因为可能其他线程已经完成了table的初始化
if ((tab = table) == null || tab.length == 0) {
// 如果table初始化还未完成,那么久进行table初始化
// sc记录的是sizeCtl更新为-1之前的值
// sc > 0: 代表当前数组扩容阈值, 或者是当前数组的初始化大小
// sc < 0: 则取默认扩容容量 16
// 默认使用无参构造方法的时候,默认扩容容量为16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建一个Node数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// table指向初始化的Node数组
table = tab = nt;
// 这个就是负载因子的由来
// 首先 n >>> 2 是将n的二进制向右移动两位
// 无论是构造方法指定容量还是使用DEFAULT_CAPACITY,n都是2的次幂
// 那么 n>>>2 就是等同于将n÷4
// 因此 sc = 0.75n
// 0.75n > 0 根据前面的 sizeCtl 的定义
// 此刻0.75n代表了数组扩容阈值
// 也就是说当容量达到0.75n的时候进行扩容
sc = n - (n >>> 2);
}
} finally {
// 将上面求得的扩容阈值赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
CAS 返回table某个下标的Node——tabAt
方法
// tab指向的是table table是被volatile修饰的
// 使用Unsafe类的getObjectVolatile方法获取索引下标的对象值
// getObjectVolatile方法第一个参数为获取值的对象 第二个参数为对象的内存偏移量,表示要访问对象的具体字段或数组元素位置。
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
利用CAS往table数组的某个下标插入值——casTabAt
方法
// 利用Unsafe类的compareAndSwapObject方法 将table数组的某个下标对应值替换成需要存储的键值对
// compareAndSwapObject方法
// 第一个参数为需要操作的对象
// 第二个参数为对象的内存偏移量,表示要访问对象的具体字段或数组元素位置。
// 第三个参数为期望的值,用于比较对象当前的值。
// 第四个参数为要设置的新值,如果对象的当前值与期望值相等,则将新值设置到对象上。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
5. 取值流程
对于取值,通常都是通过get
方法根据Key取值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算Key的二次哈希值
int h = spread(key.hashCode());
// table已经初始化 并且 table长度大于0 并且 Key的二次哈希值计算出的索引下标的桶中有值才进去找
// 否则直接return null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 桶下挂的一个节点的哈希值与Key的二次哈希值一样
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 并且当Key的地址或Key的内容一样 则说明这就是Key对应的Value
return e.val;
}
else if (eh < 0)
// static final int MOVED = -1; 代表当前hash位置的数据正在扩容!
// static final int TREEBIN = -2; 代表当前hash位置下挂载的是一个红黑树
// static final int RESERVED = -3; 预留当前索引位置……
// eh小于0, 也就是上面三种情况,说明桶下可能是个红黑树
return (p = e.find(h, key)) != null ? p.val : null;
// 上述都不成立的情况下,只能是链表了
// 一个个遍历即可
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
// Node内部的实现的find, 用于支持get方法
// 由于桶中可能包含链表或红黑树结构,因此需要根据情况进行不同的查找方式
// 当桶中的节点数量较多,且已经转换为红黑树时,会调用红黑树节点的 find 方法来进行查找,以保证查找效率
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
6. 扩容流程
ConcurrentHashMap
的扩容实现是要比HashMap
复杂的。
ConcurrentHashMap
的应用场景是多线程场景,需要综合考虑多线程对扩容产生的影响,避免HashMap
在多线程情况下扩容出现了死链或数据错乱的问题。
触发扩容机制的触发,主要涉及两个方法``treeifyBin与
tryPresize`方法
treeifyBin
方法: 在putVal
方法的时候,将一个键值对放进桶中,当链表长度大于等于8时,如果数组长度小于64,会调用treeifyBin
方法进行扩容tryPresize
方法: 针对putAll
或将Map作为构造参数public ConcurrentHashMap(Map<? extends K, ? extends V> m)
时候会可能触发的tryPresize
方法进行扩容
这个扩容流程有点还没捋清楚,下一章再更新吧~