目录
一. 前言
二. 源码解析
2.1. 类结构
2.2. 基本属性
2.3. 构造方法
2.4. 增加元素
2.4.1. initTable()
2.4.2. helpTransfer()
2.4.3. transfer()
2.4.4. treeifyBin()
2.4.5. addCount()
2.5. 获取元素
2.6. remove() & replace()
2.7. clear()
2.8. size()
2.9. mappingCount()
2.10. unsafe 方法
三. 常见问题
3.1. 什么时候扩容?
3.2. 什么时候树化和退化?
3.3. JDK1.8为什么放弃分段锁?
3.4. JDK1.8的map实现
3.5. 为什么不用ReentrantLock而用synchronized?
3.6. 多个线程又是如何同步处理的呢?
3.7. 为什么 key 和 value 不允许为 null?
一. 前言
ConcurrentHashMap 是 HashMap 的线程安全版本,其内部和 HashMap 一样,都是采用了数组 + 链表 + 红黑树的方式来实现。
如何实现线程的安全性?加锁。但是这个锁应该怎么加呢?在 HashTable 中,是直接在 put 和 get 方法上加上了 synchronized,理论上来说 ConcurrentHashMap 也可以这么做,但是这么做锁的粒度太大,会非常影响并发性能,所以在 ConcurrentHashMap 中并没有采用这么直接简单粗暴的方法,其内部采用了非常精妙的设计,大大减少了锁的竞争,提升了并发性能。(CAS + synchronized )。
二. 源码解析
2.1. 类结构
ConcurrentHashMap继承了AbstractMap抽象类,该抽象类定义了一些基本操作,同时,也实现了ConcurrentMap接口,ConcurrentMap接口也定义了一系列操作,实现了Serializable接口表示ConcurrentHashMap可以被序列化。
2.2. 基本属性
//数组最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//构造时,不设的话,默认数组大小16,
private static final int DEFAULT_CAPACITY = 16;
//最大数组大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//当节点数量>8时,会考虑进行树化。当确定数组容量>64进行树化。
static final int TREEIFY_THRESHOLD = 8;
//节点数量<6时,退化为链表。
static final int UNTREEIFY_THRESHOLD = 6;
//当数组数量>64时,会树化
static final int MIN_TREEIFY_CAPACITY = 64;
//迁移最小步长,每个线程执行迁移时,最少迁移16个节点的数据
private static final int MIN_TRANSFER_STRIDE = 16;
//sizectl位移位数
private static int RESIZE_STAMP_BITS = 16;
//最大迁移时辅助线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//记录ctrl位移位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // hash for forwarding nodes
// 表示已经转换成树
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
//表示2^31-1,int的最大正整数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/* ---------------- Fields -------------- */
//存储表数据
transient volatile Node<K,V>[] table;
//执行迁移时,用于临时存储扩容之后的数据,扩容结束后,nextTable赋值给table
private transient volatile Node<K,V>[] nextTable;
//记录基本计数器,用于统计节点个数,每次插入/删除数据时更新,当没冲突时候,则累加到该变量上,冲突剧烈则添加到counterCells数组里面。
private transient volatile long baseCount;
/**
* 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
* 当为负的时候,说明表正在初始化或扩张,
* 0:默认状态,表示数组还没有被初始化。
* -1:初始化数组
* -(1+n):n:表示活动的扩张线程
* sizeCtl>0:记录下一次需要扩容的大小。为3/4数组最大长度
*/
private transient volatile int sizeCtl;
//记录当前迁移数据到数组下标哪儿了。从table.leng -> 0
private transient volatile int transferIndex;
//搭配counterCells用于加锁,用于修改节点个数。
private transient volatile int cellsBusy;
//记录每个线程修改次数
private transient volatile CounterCell[] counterCells;
2.3. 构造方法
// 无参构造函数,什么也不做,table的初始化放在了第一次插入数据时,默认容量大小是16和HashMap的一样,默认sizeCtl为0
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
// 确保容量正数
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 如果传入的容量大小大于允许的最大容量值 则cap取允许的容量最大值 否则cap =
// ((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方),
// 即如果传入的容量大小是12 则 cap = 32(12 + (12 >>> 1) + 1=19
// 向上取2的幂次方即32),这里为啥一定要是2的幂次方,原因和HashMap的threshold一样,都是为
// 了让位运算和取模运算的结果一样。
// MAXIMUM_CAPACITY即允许的最大容量值 为2^30。
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// tableSizeFor这个函数即实现了将一个整数取2的幂次方。
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,代表进行扩容的容量大小。
this.sizeCtl = cap;
}
// 包含指定Map的构造函数。置sizeCtl为默认容量大小 即16。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 传入容量大小和负载因子的构造函数。默认并发数大小是1。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 传入容量大小、负载因子和并发数大小的构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 如果传入的容量大小 小于 传入的并发数大小,
// 则容量大小取并发数大小,这样做的原因是确保每一个Node只会分配给一个线程,而一个线程则
// 可以分配到多个Node,比如当容量大小为64,并发数大小为16时,则每个线程分配到4个Node。
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// size = 1.0 + (long)initialCapacity / loadFactor 这里计算方法和上面的构造函数不一样。
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 如果size大于允许的最大容量值则 sizeCtl = 允许的最大容量值 否则 sizeCtl = size取2的幂次方。
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
ConcurrentHashMap的构造函数有5个,从数量上看就和HashMap、Hashtable(4个)的不同,多出的那个构造函数是 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel),即除了传入容量大小、负载因子之外还多传入了一个整型的concurrencyLevel,这个整型是我们预先估计的并发量,比如我们估计并发是30,那么就可以传入30。
其他的4个构造函数的参数和HashMap的一样,而具体的初始化过程却又不相同,HashMap和Hashtable传入的容量大小和负载因子都是为了计算出初始阈值(threshold),而ConcurrentHashMap传入的容量大小和负载因子是为了计算出sizeCtl用于初始化table,这个sizeCtl即table数组的大小,不同的构造函数计算sizeCtl方法都不一样。
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*
* 返回大于输入参数且最近的2的整数次幂的数
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 执行上面步骤后,所有位都为1,这样n+1后 1111 会进一位,为10000
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
比如,7最接近的2整数幂为8,8最接近的整数幂为8。
n |= n >>> 1; n右移动一位后,会确保n的(最高位)和(最高位-1)都为1。
n |= n >>> 2; n右移动二位后,这样(最高位-2)和(最高位-3)都为1,进行 | 操作后,(最高位)~(最高位-4)都为1。
n |= n >>> 4; …同理,执行结束后,会确保n的(最高位)~(第一位)都为1。
而c-1,是当c为2的N次幂时,假设为8,减1后,二进制为0111,这样执行结束后为111。返回的时候n+1,有返回了当前值。不减1的话,1000,执行结束后为1111,返回时执行n+1,返回了16。
2.4. 增加元素
put(K key, V value) 方法:通过自旋,将value添加到map。
1> 倘若tab没初始化,则初始化。
2> tab初始化了,当前数组下标中没元素,则新建一个链表元素,CAS添加到数组中。成功则结束循环体,失败则继续自旋。
3> 当前节点有元素,且节点处于迁移状态,则辅助去迁移。
4> 节点有元素,说明hash冲突了,则添加到链表or红黑树中。(节点个数达到8以上,则进行扩容或者树化节点;若只是修改数据,则直接返回。)
public V put(K key, V value) {
/*
* onlyIfAbsent
* false:这个value一定会设置
* true:只有当这个key的value为空的时候才会设置
*/
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许key/value为null,否则及时失败
if (key == null || value == null) throw new NullPointerException();
// 获取key的hashCode
int hash = spread(key.hashCode());
// 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
int binCount = 0;
// 通过自旋将value添加到map
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 1:初始化tab
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if (
// 2:当前位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的
// (n-1)&hash:计算元素索引,这里绝对不会超过表下标。
(f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 2.1:添加失败则继续自旋,成功则结束自旋。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
/*
* 3:如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
* 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
*/
tab = helpTransfer(tab, f);
else {
// 4:走到这里说明hash冲突了,则将value添加到链表或者红黑树结构下
V oldVal = null;
synchronized (f) {
// 4.1:再次取出要存储的位置的元素,跟前面取出来的比较,相同则处理,否则继续自旋添加。
if (tabAt(tab, i) == f) {
// 4.1.1:取出来的元素的hash值大于0,说明是链表。当转换为树之后,hash值为-2
if (fh >= 0) {
// 统计遍历个链表节点个数
binCount = 1;
// 4.1.2:遍历每一个节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 4.1.3:节点hash值一样且key一样
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 4.1.4:判定是否存在,存在则进行修改。
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 4.1.4:下一个节点为null,说明遍历结束,则新建对象.
if ((e = e.next) == null) {
// 添加链表尾端
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 对红黑树的处理
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 5:链表节点其中元素个数达到8同时数组长度>64,则转成树。否则仅扩容。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 说明仅是修改,则直接返回值。
if (oldVal != null)
return oldVal;
break;
}
}
}
// 6:累加元素个数,传入添加的数量1,以及链表节点个数,用于控制是否执行扩容操作。
addCount(1L, binCount);
return null;
}
2.4.1. initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 1:sizeCtl初始值为0,当小于0的时候表示在别的线程在初始化表或扩展表
// 则暂停当前正在执行的线程对象,并执行其他线程。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (
// 2.1:如果当前内存偏移量SIZECTL的值为sc,则将sizeCtl原子修改为-1,表示处于初始化状态
// 修改成功则初始化数组,否则继续自旋。搭配第一步则能确保同一时间只有一个创建数组。
U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重检查加锁,避免其他方法新建tab时覆盖,增加容错
if ((tab = table) == null || tab.length == 0) {
// 2.2:指定了大小的时候就创建指定大小的Node数组,否则创建指定大小(16)的Node数组
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 2.3:初始化table完成,并赋值给tab用于返回
table = tab = nt;
// 2.4:sc=n*3/4
sc = n - (n >>> 2);
}
} finally {
// 2.5:初始化后,sizeCtl长度为数组长度的3/4
sizeCtl = sc;
}
break;
}
}
return tab;
}
initTable():初始化,自旋创建数组。
1> sizeCtl <0 则说明其他线程在扩容或者初始化,则礼让。
2> CAS修改sizeCtl=1,表示初始化中。
2.1> 执行初始化数组。没指定大小,则默认16;
2.2> 设置sizeCtl为数组3/4。
2.4.2. helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
// 辅助去扩容
Node<K,V>[] nextTab; int sc;
// 1:tab不为空,同时f为头节点(当节点处于move状态时,头节点为ForwardingNode)
// 同时头节点的下一个节点不为空则进入循环体。为空说明仅仅只是标记当前节点已经迁移,但节点内部没数据。所以无需辅助迁移。
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 2:判断待迁移数据,和临时存储迁移数据数组是否有改变,没改变则添加到迁移任务中。
// sizeCtl<0则表示处于迁移状态中。
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 2.1:如果 sc 的高 16 位也就是当前n扩容标识,不等于标识符,这说明扩容的容量变化了,不是当前原容量扩容,则退出。
// 走到这里,sc的第32位为1,高16~31位表示数组的长度转成2进制前面有多少个0。低位第二位为1,这是第一个线程扩容时位移后执行了+2操作。
// 当sc位移16位会覆盖低位的数,一般情况下值会和resizeStamp(tab.length)一样。这里可以理解为解码比较是否相等。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs
// 2.2:如果 sc == 标识符 + 1 ,说明扩容结束了,不再有线程进行扩容。则退出
//(默认第一个线程设置 sc ==rs 左移 16 位 + 2,
// 当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
|| sc == rs + 1 ||
// 2.3:如果 sc == 标识符 + 65535(帮助线程数已经达到最大),则退出。
sc == rs + MAX_RESIZERS ||
// 2.4:说明已经扩容完成又或者有足够的线程扩容,则退出。
transferIndex <= 0)
break;
// 3:线程数+1,帮助一起转换
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
helpTransfer():辅助扩容。
1>. tab不为空,且节点为Move类型,且节点下一个元素不为空。(为空则说明是空节点,直接返回)。
2>. 当处于迁移状态时(即sizeCtl<0则说明处于迁移态),自旋。
2.1>. 扩容容量改变,或扩容结束,或辅助扩容线程达到最大限制,或扩容任务已经分配完,则退出。
2.2>. 修改sizeCtl线程数+1,并辅助迁移数据。辅助完则退出。
2.4.3. transfer()
// 转移节点
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 1:数组长度为n,表示n个任务,每个线程处理多少个任务。
// 如果为单核,则处理所有的任务。多核则处理n>>>3/核心数个任务,最少处理16个任务。用stride记录每个处理多少任务。
// 当分配第一个线程处理任务时,transferIndex累加stride。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 2:当第一个线程进此方法的时候,nextTab才会为null,辅助线程直接跳过。初始化扩容之后的数组,容量扩大一倍。
// 并初始化转移的索引,为待迁移数组的最大长度。
if (nextTab == null) { // initiating
try {
// 扩容一倍数组容量
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
/*
* 3:创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节,表示处于move状态。
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 是否继续向前查找的标志位
boolean advance = true;
// 在完成之前重新在扫描一遍数组,看看有没完成的没
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 4:该循环体用于控制从(transferIndex-1)到(transferIndex-stride),执行数据的迁移
// 且重新分配transferIndex的值,用于不停向前推进更新迁移数据。
while (advance) {
int nextIndex, nextBound;
// 4.1:i每次自减,小于范围(表示从(transferIndex-1)到(transferIndex-stride),执行完数据的迁移)
// 或者 当前线程执行完成 则标记不需要再向前查找
if (--i >= bound || finishing)
advance = false;
// 4.2:transferIndex<=0,则说明目前每个桶位置都有线程在进行处理,跳出循环
// 每次执行一次任务,transferIndex减少一次步长stride
// transferIndex值第一次为待迁移表长度
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 4.3:更新transferIndex的值,减少一个步长的值。
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 这里分别设置i和bound,分别表示从(transferIndex-1)开始递减遍历到(transferIndex-stride),执行数据的迁移。
// 假设当前数组长度为32,stride=16
// 则nextIndex=32
// transferIndex=nextBound= nextIndex - stride=16
// bound=16
bound = nextBound;
// i=31
i = nextIndex - 1;
// 设置不用向前推进
advance = false;
}
}
// 5.1:对迁移完数据的后置处理。包括重新检查一遍迁移数据,以及归还线程。
// i<0:说明从(transferIndex-1)到(transferIndex-stride)节点数据迁移结束。
// i>=n和i + n >= nextn:说明出现了扩容并发修改了transferIndex,造成修改,则结束修改
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 5.1.1:已经完成转移,更新数组的值
if (finishing) {
nextTable = null;
// 这里完成nextTab=>table转换
table = nextTab;
// 为扩容后的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 5.1.2:正在工作的线程数-1,并返回
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 第一个线程转移节点的时候,会sc-2,后续的线程参与转移时会,sc+1。
// 当sc-2时值不一样,说明不是最后一个线程,是辅助线程在执行,则直接返回。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 说明是最后一个线程,则重新check一遍,在返回。
// 执行到这里,如果是单线程进行迁移,则从0~数组最大长度,重新检查一遍。
// 如果是多线程进行迁移,也极大最后
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// 5.2:把数组中null的元素设置为ForwardingNode节点(hash值为MOVED),让循环体处理下一个节点。后续辅助线程发现节点为Move则会直接跳过。
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 5.3:表示已有线程正在处理,让循环体处理下一个节点。
advance = true; // already processed
else {
// 5.4:锁住节点,进行迁移.
synchronized (f) {
// 双重检查加锁
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 5.4.1:>=0说明是node节点
if (fh >= 0) {
// 为0则表示放在扩容后数组当前索引下,否则放在n+之前位置索引下
int runBit = fh & n;
Node<K,V> lastRun = f;
/*
循环结束之后,runBit就是最后不变的hash&n的值
也就是说由lastRun节点后的hash&n的值一样,这样就可以直接保存,而不需要处理后面的节点
*/
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 说明之后的节点都是低位
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
// 说明之后的节点都是高位
hn = lastRun;
ln = null;
}
// 前面的节点不确定高低位,所以遍历f~lastRun范围的所有节点
// 分别逆序存入ln或hn链表中
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 存入之前的位置
setTabAt(nextTab, i, ln);
// 存入改变后的位置
setTabAt(nextTab, i + n, hn);
// 设置fwd,这样其他线程执行的时候,会跳过去.
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 5.4.2:红黑树处理
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
/*
* 在复制完树节点之后,判断该节点处构成的树还有几个节点,
* 如果≤6个的话,就转为一个链表
*/
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位链表存储i处
setTabAt(nextTab, i, ln);
// 高位存储i+n处
setTabAt(nextTab, i + n, hn);
// 原来tab中存储fwd,标识该桶扩容完成
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
transfer(Node<K,V>[] tab, Node<K,V>[] nextTab):转移节点
1>. 设置步长,即每个线程每次任务迁移多少个步长节点数据。
2>. 新建扩容之后的数组,容量扩容一倍。设置transferIndex表示迁移到哪儿了。
3>. 循环迁移节点
3.1>. 新建循环体用于控制从(transferIndex-1)到(transferIndex-stride),执行数据的迁移且重新分配transferIndex的值,用于不停向前推进更新迁移数据。
3.2>. 对迁移完数据的后置处理。包括重新检查一遍迁移数据,以及归还线程。
3.3>. 把数组中null的元素设置为ForwardingNode节点(hash值为MOVED),让循环体处理下一个节点。后续辅助线程发现节点为Move则会直接跳过。
3.4>. 锁住节点,进行迁移。
2.4.4. treeifyBin()
treeifyBin(Node<K,V>[] tab, int index):主要作用是扩容以及红黑树化节点数据。
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
* 数组长度<64,则扩容一倍
* 否则转成树
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 1:数组长度<64则扩容一倍
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 2:转成红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
// 把Node组成的链表,转化为TreeNode的链表,头结点依然放在相同的位置
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 把TreeNode的链表放入容器TreeBin中,内部将单节点树转换成红黑树
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
tryPresize(int size):尝试扩容。
1>. 寻找贴近扩容的大小容量
2>. 自旋扩容,<0时,说明在扩容,则不扩容。
2.1>. 如果数组还没有初始化,则执行初始化
2.2>. 扩容后的大小<=sizeCtl,说明当前数组已经满足需要扩容的容量。或者当前数组长度>容量上限,没法分配容量了,则退出
2.3>. 执行扩容,如果处于正在扩容则添加当前线程一起扩容。
// 尝试扩容
private final void tryPresize(int size) {
// 1:寻找贴近扩容的大小容量
// 扩容大小>=最大的一半,直接设置成最大容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
// 当size < 0.5*size时,size最高位<<1位,其余补0
// size >= 0.5*size时,size最高位<<2位,其余补0
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 2:自旋扩容,<0时,说明在扩容,则不扩容。
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 2.1:如果数组还没有初始化,putAll的时候,会执行这儿
if (tab == null || (n = tab.length) == 0) {
// 2.2:sizeCtl可能会发生修改,所以这里在判断一次。
n = (sc > c) ? sc : c;
// 2.3:扩容,并更新sc
// SIZECTL设置-1,表示正在初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//双重检查
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//sc=3/4*n
sc = n - (n >>> 2);
}
} finally {
// 2.3:更新扩容后的大小
sizeCtl = sc;
}
}
}
// 3:扩容后的大小<=sizeCtl,说明当前数组已经满足需要扩容的容量。或者当前数组长度>容量上限,没法分配容量了,则退出
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 4: rs高16为都是0,第16位为1,低15位记录n为二进制时前面有多少0
int rs = resizeStamp(n);
// 4.1:表示正在扩容,这里方法好像永远不会进来。
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// transfer线程数+1,当前线程将加入对transfer的处理
// transfer的时候,sc表示在transfer工作的线程数
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 4.2:没有在初始化或扩容,则开始扩容,sizectl目前第32位为1表示负数,高16~31位记录n为二进制时前面有多少0。
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 4.2.1:移动节点
transfer(tab, null);
}
}
}
2.4.5. addCount()
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
/*
1:累加节点个数
as不为空,说明counterCells数组已创建了,进入条件体继续执行
若为空,则说明数组还没创建,预测竞争线程少,直接cas操作baseCount,更新元素个数
如果成功,则执行下一步,若失败,则进入条件体继续执行
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// 标记未发生竞争
boolean uncontended = true;
// 1.1:数组为空
if (as == null || (m = as.length - 1) < 0 ||
// 1.2:给当前线程随机生成一个数,获取counterCells对应值,若为null,则进入条件体
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// 1.3:条件语句执行到这里,说明counterCells不为空,且有值.则尝试修改个数
// 修改失败,则标记有很多线程竞争
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 1.4:继续累加次数
fullAddCount(x, uncontended);
return;
}
// 1.5没开启容量检查则直接返回
if (check <= 1)
return;
// 1.6:计算CounterCell总个数
s = sumCount();
}
// 走到这里,s赋值为表中所有节点个数总和。
// 2:检查容量,执行扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 2.1:数组元素总个数达到扩容的阀值,且table不为空,且数组长度小于最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据length获取一个标识符,高16位置0,第16位为1,低15位存当前n扩容标识
int rs = resizeStamp(n);
// 2.2:sc<0说明正在扩容,则去辅助扩容
if (sc < 0) {
// 如果 sc 的高 16 位也就是当前n扩容标识,不等于标识符,这说明扩容的容量变化了,不是当前原容量扩容
// 如果 sc == 标识符 + 1
// (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,
// 当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (已经有足够线程分配迁移数据啦,不需要参与迁移啦。)
// 结束循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// sc+1,帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 2.3:初始化扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
// 高第16位为1,显示负数
// 高15位容量n扩容标志
// 低16位,并行扩容线程数+1
(rs << RESIZE_STAMP_SHIFT) + 2))
// 扩容,第二个参数代表新表,传入null,表示第一次初始化新表
transfer(tab, null);
// 2.4:再次更新节点次数,用于下一次遍历扩容。
s = sumCount();
}
}
}
addCount(long x, int check):累加元素个数
1>. 累加节点个数。counterCells数组已创建了,或者CAS累加基本计数器失败,则更新元素个数。
1.1>. 没开启容量检查则直接返回
1.2>. 重新计算CounterCell总个数
2>. 检查容量,执行扩容。自旋扩容,需要满足数组节点元素个数达到扩容的阀值,且table不为空,且数组长度小于最大容量条件
2.1>. sizeCtl<0,则辅助扩容
2.2>. 初始化扩容
2.3>. 更新数组节点元素总个数。
fullAddCount() 分析如下:
private final void fullAddCount(long x, boolean wasUncontended) {
// h作用:将线程和数组中的不用元素对应起来,尽量避免线程争用同一数组元素。
// 可理解为创建一个hash值与当前线程绑定,这样当前线程每次访问的只会是CounterCell数组中固定的一个位置的数。
// 当出现争抢时,ThreadLocalRandom.advanceProbe(h);重新更新线程的探针哈希值,让线程去使用另一个数组元素
int h;
// 1:如果当前线程随机数为0,则强制初始一个值
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//初始化当前线程的探针哈希值
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
//设置无竞争
wasUncontended = true;
}
// 标记是否发生碰撞
boolean collide = false; // True if last slot nonempty
// 2:自旋累加节点次数。
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 2.1:counterCells不为null
if ((as = counterCells) != null && (n = as.length) > 0) {
// 2.1.1:当前线程所在格子为空
if ((a = as[(n - 1) & h]) == null) {
//锁未占用
if (cellsBusy == 0) { // Try to attach new Cell
// 新建CounterCell
CounterCell r = new CounterCell(x); // Optimistic create
// 再次检查锁未占用,尝试加锁
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 表示创建CounterCell是否成功状态
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 再次检查counterCells不为空,且格子为占用
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 设置格子
rs[j] = r;
// 设置创建状态
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 正确创建则退出循环
if (created)
break;
continue; // Slot is now non-empty
}
}
// 表明cellsBusy=1锁上发生竞争,则重新生成随机数,进行下次循环
collide = false;
}
// 2.1.2:CAS 失败,重新继续
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 执行到这里说明wasUncontended=true,认为无竞争
// 且所在槽有值
// 2.1.3:尝试直接累加
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 2.1.4:数组发生变化,说明发生了扩容或者数组长度>=cpu核心数,
// 则认为无碰撞
// 当扩容超过限制后,则会不停的执行3和4,直到2成功
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 2.1.5:数组没变化且数组长度<CPU核心数,且collide认为无碰撞,则设置有碰撞
else if (!collide)
collide = true;
// 2.1.6:执行到这里,说明数组没变化,且有碰撞,则需要扩容
else if (
// 无锁
cellsBusy == 0 &&
//尝试加锁
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//2.1.6.1:counterCells没发生变化,扩容counterCells一倍容量
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 2.1.6.2:说明线程出现争抢,则更改当前线程的探针哈希值,进行下次循环
h = ThreadLocalRandom.advanceProbe(h);
}
// 2.2:counterCells为空,则尝试加锁
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
// 再次判断counterCells是否有变化
if (counterCells == as) {
// 新建长度2的数组
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
// 标记初始化完成
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 跳出循环
if (init)
break;
}
// 2.3:数组为空,抢锁失败,则尝试直接累加baseCount值
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
fullAddCount(long x, boolean wasUncontended):
1>. 如果当前线程随机数为0,则强制初始一个值,用于访问CounterCell数组中固定的元素。
2>. 自旋累加节点次数
2.1>. counterCells不为null
2.1.1>. 当前线程所在格子为空,抢占锁并添加元素值,成功则退出。
2.1.2>. 当前线程格子直接CAS累加次数,成功则退出。
2.1.3>. counterCells数组没变化,且有碰撞,则需要扩容。CAS加锁成功后,扩容。扩容成功,则跳过当前循环,继续执行下一次循环。
2.1.4>. 线程出现争抢,则更改当前线程的探针哈希值,进行下次循环
2.2>. counterCells为空,则尝试加锁。加锁成功后,再次判断counterCells是否有变化,没变化则初始化counterCells,并添加节点次数,跳出循环。
3>. 数组为空,抢锁失败,则尝试直接累加baseCount值。
sumCount():计算CounterCell总个数。
final long sumCount() {
//遍历累加CounterCell数组值到baseCount
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
2.5. 获取元素
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 运用键key的hashCode()计算出哈希地址
int h = spread(key.hashCode());
// 如果table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
// 则代表这个bucket存在,进入到bucket中查找,
// 其中(n - 1) & h为计算出键key相对应的数组下标的算法。
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果bucket头节点的哈希地址小于0,则代表bucket为红黑树,在红黑树中查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 如果bucket头节点的哈希地址不小于0,则代表bucket为链表,遍历链表,在链表中查找。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
find() 查询节点:
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
//对链表的查询
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
//对ForwardingNode类型的node查询,这里没啥用。
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
//对红黑树的查询,调用红黑树的find方法查询
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
总体流程:
1>. 调用spread()方法计算key的hashCode()获得哈希地址。
2>. 计算出键key所在的下标,算法是(n - 1) & h,如果table不为空,且下标上的bucket不为空,则到bucket中查找。
3>. 如果bucket的头节点的哈希地址小于0,则代表这个bucket存储的是红黑树,否则是链表。
4>. 到红黑树或者链表中查找,找到则返回该键key的值,找不到则返回null。
2.6. remove() & replace()
public V remove(Object key) {
// 参数2为空,表示删除
return replaceNode(key, null, null);
}
public V replace(K key, V value) {
if (key == null || value == null)
throw new NullPointerException();
return replaceNode(key, value, null);
}
/**
* {@inheritDoc}
*
* @throws NullPointerException if any of the arguments are null
* key映射有值,值为oldValue,则更新
*/
public boolean replace(K key, V oldValue, V newValue) {
if (key == null || oldValue == null || newValue == null)
throw new NullPointerException();
return replaceNode(key, newValue, oldValue) != null;
}
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 空判断
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
// 正在扩容,则加入扩容队伍
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
// 双重检查加锁,DCL
if (tabAt(tab, i) == f) {
// 对链表的处理
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
// 匹配key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
// 匹配value
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
// 修改
if (value != null)
e.val = value;
// value=null,说明执行删除操作
else if (pred != null)
pred.next = e.next;
// 当匹配的值是first节点的处理
else
setTabAt(tab, i, e.next);
}
break;
}
// 临时保存上一个节点,便于执行删除节点操作
pred = e;
// 执行最后一个节点,则退出
if ((e = e.next) == null)
break;
}
}
// 红黑树的处理
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
// 个数-1,参数二:标记不需要检测是否需要扩容
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
2.7. clear()
clear():清除节点
public void clear() {
// 记录删除个数,负数表示。
long delta = 0L; // negative number of deletions
int i = 0;
Node<K,V>[] tab = table;
// 遍历节点删除
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
// 1:为null直接跳过
if (f == null)
++i;
else if ((fh = f.hash) == MOVED) {
// 2:若处于迁移态,则帮助去迁移。
tab = helpTransfer(tab, f);
// 3:迁移完成后,重置数组索引,重头开始遍历。
i = 0; // restart
}
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> p = (fh >= 0 ? f :
(f instanceof TreeBin) ?
((TreeBin<K,V>)f).first : null);
// 逐个删除元素
while (p != null) {
--delta;
p = p.next;
}
// 最后一个元素置为null,自此该节点删除。
setTabAt(tab, i++, null);
}
}
}
}
// 倘若删除了节点,则修改元素个数,并不扩容。
if (delta != 0L)
addCount(delta, -1);
}
2.8. size()
size():获取大小
public int size() {
//size和isEmpty一样,返回一个近似值,
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
2.9. mappingCount()
mappingCount():获取大小,官方推荐
public long mappingCount() {
long n = sumCount();
// 忽略瞬间的负值
return (n < 0L) ? 0L : n; // ignore transient negative values
}
2.10. unsafe 方法
在ConcurrentHashMap中,大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。
unsafe代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。 在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。 利用CAS进行无锁操作,可以大大提高性能。
/*
* 用来返回节点数组的指定位置的节点的原子操作
*/
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/*
* cas原子操作,在指定位置设定值
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/*
* 原子操作,在指定位置设定值
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
三. 常见问题
3.1. 什么时候扩容?
1. 单节点容量>=8且容量<64,则扩容一倍。
2. 当数组中元素达到了 sizeCtl 的数量的时候,则会调用transfer方法来进行扩容。
下图代码参见2.4.5节 addCount()。
3.2. 什么时候树化和退化?
1. 单个节点元素>=8个,且数组元素<=64,树化。
2. 元素节点<=6,则退化成链表。
3.3. JDK1.8为什么放弃分段锁?
在JDK1.5~1.7版本,Java使用了分段锁机制实现ConcurrentHashMap。简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可以实现多线程put操作。
段Segment继承了重入锁ReentrantLock,有了锁的功能,每个锁控制的是一段,当每个Segment越来越大时,锁的粒度就变得有些大了。
分段锁的优势在于保证在操作不同段 map 的时候可以并发执行,操作同段 map 的时候,进行锁的竞争和等待。这相对于直接对整个map同步synchronized是有优势的。
缺点在于分成很多段时会比较浪费内存空间(不连续,碎片化); 操作map时竞争同一个分段锁的概率非常小时,分段锁反而会造成更新等操作的长时间等待; 当某个段很大时,分段锁的性能会下降。
3.4. JDK1.8的map实现
和HashMap一样,JDK1.8中ConcurrentHashmap采用的底层数据结构为数组+链表+红黑树的形式。数组可以扩容,链表可以转化为红黑树。
3.5. 为什么不用ReentrantLock而用synchronized?
1. 减少内存开销:如果使用ReentrantLock则需要节点继承AQS来获得同步支持,增加内存开销,而JDK1.8中只有头节点需要进行同步。
2. 内部优化:synchronized则是JVM直接支持的,JVM能够在运行时作出相应的优化措施----锁粗化、锁消除、锁自旋等等。
3.6. 多个线程又是如何同步处理的呢?
1. 同步处理主要是通过 synchronized 和 unsafe 两种方式来完成的。
2. 在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的
当需要在某个位置设置节点的时候,则会通过synchronized的同步机制来锁定该位置的节点。
3. 在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED。
4. 当把某个位置的节点复制到扩张后的table的时候,也通过synchronized的同步机制来保证线程安全。
3.7. 为什么 key 和 value 不允许为 null?
在 HashMap 中,key 和 value 都是可以为 null 的,但是在 ConcurrentHashMap 中却不允许,这是为什么呢?
作者 Doug Lea 本身对这个问题有过回答,在并发编程中,null 值容易引来歧义, 假如先调用 get(key) 返回的结果是 null,那么我们无法确认是因为当时这个 key 对应的 value 本身放的就是 null,还是说这个 key 值根本不存在,这会引起歧义,如果在非并发编程中,可以进一步通过调用 containsKey 方法来进行判断,但是并发编程中无法保证两个方法之间没有其他线程来修改 key 值,所以就直接禁止了 null 值的存在。
而且作者 Doug Lea 本身也认为,假如允许在集合,如 map 和 set 等存在 null 值的话,即使在非并发集合中也有一种公开允许程序中存在错误的意思,这也是 Doug Lea 和 Josh Bloch(HashMap作者之一) 在设计问题上少数不同意见之一,而 ConcurrentHashMap 是 Doug Lea 一个人开发的,所以就直接禁止了 null 值的存在。