介绍
ConcurrentHashMap是线程安全且高效的HashMap。
为什么要使用ConcurrentHashMap
线程不安全的HashMap
HashMap多线程情况下put操作会出现并发安全问题,包括
死循环、数据丢失(jdk7)以及数据覆盖(jdk8)
。jdk7中,多线程put操作扩容采用头插法,是导致死循环(链表成环)以及数据丢失问题的根本原因(具体是transfer方法中的两行代码导致);
jdk8中,jdk8中将扩容操作改为尾插法,解决了死循环和数据丢失问题但是数据覆盖问题仍未解决,因此HashMap是线程不安全的。
final HashMap<String, String> map = new HashMap<>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "ftf" + i).start();
}
}
}, "ftf");
t.start();
// 正常情况:线程执行结束,会从这返回 (join方法)
// 但是由于jdk7多线程HashMap#put导致死循环,程序永远不会停止
t.join();
jdk7中执行上述代码,HashMap进行put操作会引起死循环,导致CPU利用率接近100%。
效率低下的Hashtable
Hashtable容器使用synchronized来保证线程安全,在线程竞争激烈的情况下HashTable的效率非常低下。
当一个线程访问Hashtable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,竞争越激烈效率越低。
JDK1.8 版本 ConcurrentHashMap 做的改进
在 JDK1.7 版本中,ConcurrentHashMap 由数组 + Segment + 锁分段实现,其内部分为一个个段(Segment)数组,Segment 通过继承 ReentrantLock 来进行加锁,通过每次锁住一个 segment 来降低锁的粒度而且保证了每个 segment 内的操作的线程安全性,从而实现全局线程安全。
假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。
这么做的缺陷就是每次通过 hash 确认位置时需要 2 次才能定位到当前 key 应该落在哪个槽:
- 通过 hash 值和 段数组长度-1 进行位运算确认当前 key 属于哪个段,即确认其在 segments 数组的位置。
- 再次通过 hash 值和 table 数组长度 - 1进行位运算确认其所在。
为了进一步优化性能,在 jdk1.8 版本中,对 ConcurrentHashMap 做了优化,取消了分段锁的设计,取而代之的是通过 cas 操作和 synchronized 关键字来实现优化
,而扩容的时候也利用了一种分而治之的思想来提升扩容效率,在 JDK1.8 中 ConcurrentHashMap 的存储结构和 HashMap 基本一致(数组 + 链表 +红黑树)
JDK1.8 ConcurrentHashMap原理
JDK1.7 ConcurrentHashMap的put()方法
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
// 求key的hash值
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// 元素的hashCode再散列,让数字每一位都参与到散列运算中
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
求key的hash值时,会对元素的hashCode进行一次再散列
(能让数字的每一位都参加到散列运算当中)。目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率。
JDK1.8 volatile修饰的节点数组
// volatil修饰,保证多线程可见性,禁止指令重排
transient volatile Node<K,V>[] table;
JDK1.8 ConcurrentHashMap的put()方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 首先进来直接一个死循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// tab未被初始化,则先将tab初始化,结束本轮循环,进入下轮循环
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 第二轮循环,通过key的hash值来判断table中是否存在相同的key,如果不存在,执行casTabAt()方法。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS操作无需加锁(效率得以提升),新增一个新的节点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// tab的状态,MOVED表示在扩容中,如果在扩容中,帮助其扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 最后一个分支
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
前面所有分支都不满足,进入最后一个分支:
else {
V oldVal = null;
// 加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
key
的hash
值位置不为null
,表示发生了hash
冲突,此时节点就要通过链表的形式存储这个插入的新值(更新或者插入链表末尾)。
该分支,首先加排它锁,只有在发生hash冲突的时候才加了排它锁
。
if (tabAt(tab, i) == f) {
if (fh >= 0) {
重新判断当前节点是不是第二条判断过的节点,如果不是,表示节点被其他线程改过了。
如果是,再次判断是否在扩容中,如果是,进入下一轮循环,
如果不是,开始新节点操作。
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到一个hash值相同,且key也完全相同的节点,更新这个节点。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果未找到,往链表最后插入这个新节点。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
由于更新或者插入操作在排他锁中,这些操作都可以直接操作(无需考虑线程安全问题)。
小结
JDK1.8 put流程:
- 做put操作时,首先进入乐观锁
- 先
判断容器是否初始化
- 没初始化则初始化容器
- 已经初始化,则
判断hash位置的节点是否为空
- 如果为空,则通过CAS操作进行插入,直接结束
- 节点不为空,再判断容器是否在扩容中,如果在扩容,则帮助其扩容
- 如果没有扩容,则进行最后一步,先加锁,然后找到hash值相同的那个节点(
hash冲突
) - 循环判断这个节点上的链表,决定做覆盖操作还是插入操作,操作完毕,结束循环。
JDK1.8 ConcurrentHashMap的get()方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 链表头节点满足条件直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 循环链表,查找节点
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get操作无需加锁,table
有volatile
关键字修饰,保证每次获取值都是最新的。直接循环查找链表满足节点即可。
使用场景
多线程环境下,读多写少,性能比较高
。
乐观锁,认为更新操作不会被其他线程影响。因此在更新少的情况下性能高。