一、ConcurrentHashMap的前置知识扫盲
ConcurrentHashMap的存储结构?
数组 + 链表 + 红黑树
二、ConcurrentHashMap的DCL操作
HashMap线程不安全,在并发情况下,或者多个线程同时操作时,肯定要使用ConcurrentHashMap
无论是HashMap还是ConcurrentHashMap,在new好之后,数组并不是直接初始化好的。
如果是这种懒汉式的初始化方式,ConcurrentHashMap需要保证初始化数组时,是线程安全的。
看源码之前,先掌握一个ConcurrentHashMap的核心属性,这个属性是控制扩容和初始化数组的核心属性。
private transient volatile int sizeCtl;
sizeCtl是控制数组的初始化和扩容的。
sizeCtl == -1: 代表数组正在初始化。
sizeCtl < -1: 代表数组正在扩容
sizeCtl == 0: ConcurrentHashMap刚刚new好,并且没指定数组的初始化长度(默认长度为16)
sizeCtl > 0:
- ConcurrentHashMap刚刚new好,指定了数组的初始化长度,长度就是sizeCtl
- ConcurrentHashMap已经在使用了,sizeCtl代表扩容的阈值(数组长度 * 0.75)
了解sizeCtl之后,开始看初始化数组的源码。
// ConcurrentHashMap初始化数组的方法
private final Node<K,V>[] initTable() {
// 声明了两个属性,tab,sc
Node<K,V>[] tab; int sc;
// 判断数组初始化了咩? Check
while ((tab = table) == null || tab.length == 0) {
// 没初始化,准备初始化操作!
// 给sc赋值,并且判断数组是否正在初始化。
if ((sc = sizeCtl) < 0)
// 让出CPU的时间片,等待其他更多的机会完成初始化数组操作。
Thread.yield();
// 没线程初始化,我来初始化。
// 基于CAS的方式,将sizeCtl从原值改为-1,如果成功了,代表当前线程可以做初始化操作了。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // Lock
// 当前线程要开始初始化数组了。
try {
// 再次判断数组初始化了咩? Check
if ((tab = table) == null || tab.length == 0) {
// 数组没初始化。
// 获取数组的初始化长度。如果sizeCtl > 0 ,就用sizeCtl作为初始化的长度,否则使用默认的16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建数组!
Node[] nt = new Node[n];
// 将初始化好的数组,赋值给成员变量
table = tab = nt;
// 算出下次扩容的阈值
sc = n - (n >>> 2);
}
} finally {
// 将下次扩容的阈值赋值给sizeCtl,初始化完毕。
sizeCtl = sc;
}
break;
}
}
return tab;
}
三、ConcurrentHashMap的散列算法
ConcurrentHashMap基于key的一系列运算,最种得出元素要放到数组的哪个索引位置上。
暂时就认为ConcurrentHashMap是将key调用了hashCode得到了一个int类型的数值。
其实计算索引位置就是将数组长度 - 1和key的hashCode值做&运算得出的结果就是索引位置。
// 先优化一下代码,看的更清楚
((f = tabAt(tab, i = (n - 1) & hash)) == null)
// 代表拿到数组的某个索引位置的元素,f,如果为null准备插入数据
(f = table[(n - 1) & hash] == null)
// 这行就是在计算索引位置
(n - 1) & hash
n:数组的长度
hash:key.hashCode();
// n == 16
// hash:随便的一个int数值
00000000 00000000 00000000 00010000 :n 16
00000000 00000000 00000000 00001111 :n - 1 15
&
01010101 01010101 00110110 00101010 :hash
=
00000000 00000000 00000000 00001010 :index 10
如果ConcurrentHashMap的数组长度,允许17的话,会出现什么情况
00000000 00000000 00000000 00010001 :n 17
00000000 00000000 00000000 00010000 :n - 1 16
&
01010101 01010101 00110110 00111010 :hash
=
00000000 00000000 00000000 000 0000 :index 10
散列算法
目的就是让key的HashCode值的高低16位进行亦或运算,再和数组长度 - 1做&运算,最终得到元素存储的位置。
00000000 00000000 00000000 00010000 :n 16
00000000 00000000 00000000 00001111 :n - 1 15
01010101 01010101 00110110 00101010 :name.hashCode();
00000001 01000001 00000110 00101010 :age.hashCode();
散列算法就对这种情况做了优化!
散列算法:
(h ^ (h >>> 16)) & HASH_BITS;
优化一波
h ^ (h >>> 16)
01010101 01010101 00110110 00101010 :name.hashCode()
^
00000000 00000000 01010101 01010101 :name.hashCode() >>> 16
=
00000000 00000000 00000000 00001111 : 最终name的hashCode
00000001 01000001 00000110 00101010 :age.hashCode();
^
00000000 00000000 00000001 01000001 :age.hashCode() >>> 16
=
00000000 00000000 00000000 00001011 : 最终age的hashCode
为什么spread会让hash值和HASH_BITS做&运算
发现spread方法里,得到hash值之后,还做了一波&运算。
hash & HASH_BITS;
HASH_BITS = 01111111111111111111111111111111
hash值和HASH_BITS做&运算后,得到的结果除了最高位是0之外,其他位数没变化。
目的就是确保hash值算出来的一定是一个正数,因为负数有特殊含义。
static final int MOVED = -1; 如果存在数组中的数据的hash为-1,代表当前数组正在扩容!
static final int TREEBIN = -2; 如果存在数组中的数据的hash为-2,代表当前索引位置下挂的是红黑树!
static final int RESERVED = -3; 如果存在数组中的数据的hash为-3,当前当前数组的索引位置已经被占用了(value还没计算出来)
四、ConcurrentHashMap的并发安全(写数据)
首先确认ConcurrentHashMap在并发执行写操作时,线程是安全的。
同时还需要保证效率要高。
在JDK1.7中的实现是采用Segement分段锁的形式实现的。
Segement锁的本质就是ReentrantLock,一个Segement会管理多个索引位置,当操作指定索引位置前,需要先去或者这个索引位置对应的锁,再来执行操作。 这种方式在数组长度变长之后,效率也就一般般。
在JDK1.8中,采用的方式,可以实现为每一个索引位置都是一把独立的锁,不存在一个锁管理多个索引位置的情况,是一对一的方式。
代码实现的效果。 WCWCWCWCWCWCWCWCWCWCWCWCWC!!!!!
for (Node<K,V>[] tab = table;;) {
// 省略部分代码
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 进到这,说明当前数组的索引位置,没数据,数据要放到数组上
// 当数据要放到数组上时,基于CAS的形式存放。
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else {
// 进到这,说明数组的索引位置有数据,数据要挂到链表或者红黑树上
V oldVal = null;
// f就是索引位置上的Node对象
// 当操作时,根据数组上的f进行加锁,实现锁的细粒度化~
synchronized (f) {
// 省略部分代码
}
}
}
五、ConcurrentHashMap的计数器和size方法
计数器:每次ConcurrentHashMap在写入一个数据后,需要+1,删除一个数据后,需要-1
size方法:帮你返回当前ConcurrentHashMap中的元素个数。
计数器需要保证线程安全的同时,实现++操作,一般就采用CAS,Java中在JUC好下,恰巧提供了Atmoic的原子类,内部已经帮你实现的了,基于CAS的++操作。
发现AtmoicInteger这种提供了increment操作的原子类中,是基于do-while + CAS实现的,如果并发比较大的话,会造成不停的CAS,导致浪费CPU资源。
所以ConcurrentHashMap并没有使用AtmoicInteger的方式去实现++的线程安全,是采用了一个LongAdder的实现机制。LongAdder有一个类似分段锁的概念。
ConcurrentHashMap并没有直接调用LongAdder,而是再次实现了LongAdder的核心代码。
size方法,就是将BaseCount和CounterCell数据的值进行一波统计,最终得出结果。
size中的核心就是sumCount方法,在内部就是拿到baseCount,然后遍历CounterCell[],将内部的每一个value做 +=,最终计算出元素个数。
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}