Java Map本质不是线程安全的,HashTable和Collections同步包装器(Synchronized Wrapper)在并发场景下性能低。Java还为实现 Map 的线程安全提供了并发包,保证线程安全的方式从synchronize简单方式到精细化,比如ConcurrentHashMap。总体来说,并发包内提供的容器通用场景,远优于早期的简单同步实现。ConcurrentHashMap一直在演进,在Java 8中发生了很大的变化(Java 7也有不少更新)。
注:Java 并发包(java.util.concurrent)提供了线程安全容器类,如下,
- 各种并发容器,比如ConcurrentHashMap、CopyOnWriteArrayList。
- 各种线程安全队列(Queue/Deque),如ArrayBlockingQueue、SynchronousQueue。
- 各种有序容器的线程安全版本等。
早期 ConcurrentHashMap 实现基于:
- 分离锁,将内部进行分段(Segment),里面则是 HashEntry 的数组,和 HashMap 类似,哈希相同的条目是以链表形式存放。
- HashEntry内部使用 volatile 的 value 字段来保证可见性,也利用了不可变对象的机制改进利用 Unsafe 提供的底层能力(比如 volatile access)去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的。
早期 ConcurrentHashMap 内部结构使用分段设计,并发操作时锁定相应段,避免类似HashTable整体同步问题来提高性能。构造时Segment的数量由concurrencyLevel决定,默认是16,也可以在相应构造函数直接指定。Java需要它是2的幂数,如果输入是类似15这种非幂值,会被自动调整到16这种2的幂数。
JDK 7 基本操作源码(分离锁实现并发控制)
get方法保证的是可见性,并没有什么同步逻辑,
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key.hashCode());
//利用位操作替换普通数学运算
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 以Segment为单位,进行定位
// 利用Unsafe直接进行volatile access
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//省略
}
return null;
}
put操作先是通过二次哈希避免哈希冲突,然后以Unsafe调用方式直接获取相应的Segment,然后进行线程安全的put操作,核心逻辑实现在put的内部方法中。并发写操作时,ConcurrentHashMap会获取再入锁来保证数据一致性,Segment本身就是基于ReentrantLock的扩展实现,所以在并发修改期间,相应Segment是被锁定的。在最初阶段,进行重复性的扫描来确定相应key值是否已经在数组里面,进而决定是更新还是放置。重复扫描、检测冲突是ConcurrentHashMap的常见技巧。HashMap可能发生的扩容问题,在ConcurrentHashMap中同样存在。不过有一个明显区别,它进行的不是整体的扩容,而是单独对Segment进行扩容。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保证数据的分散性,避免哈希冲突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// scanAndLockForPut会去查找是否有key相同Node
// 无论如何,确保获取锁
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 更新已有value...
}
else {
// 放置HashEntry到特定位置,如果超过阈值,进行rehash
// ...
}
}
} finally {
unlock();
}
return oldValue;
}
size 方法的实现涉及分离锁的一个副作用。如果不进行同步,简单地计算所有 Segment 的总值,可能会因为并发put导致结果不准确,但直接锁定所有 Segment 进行计算会非常昂贵,其实分离锁也限制了 Map 初始化等操作。ConcurrentHashMap是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2)来试图获得可靠值。如果没有监控到发生变化(通过对比Segment.modCount)就直接返回,否则获取锁进行操作。
JDK 8 及之后 基本操作源码(CAS无锁并发 + synchronized)
内部存储变得和 HashMap 结构非常相似,同样是大的桶(bucket)数组,内部也是一个个链表结构(bin),同步粒度更细致一些。内部仍然有 Segment 定义,但仅仅是为了保证序列化时的兼容性,不再有任何结构上的用处。因不再使用 Segment,初始化操作大大简化,修改为 lazy-load 形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。数据存储利用 volatile 来保证可见性。使用 CAS 等操作,在特定场景进行无锁并发操作。使用 Unsafe、LongAdder 等底层手段,进行极端情况的优化。
内部实现结构中,key 是 final 的,在生命周期中一个键值对的 key 发生变化是不可能的,val 声明为 volatile 来保证可见性。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// …
}
get 方法和构造函数比较简单,直接看并发 put 方法实现。初始化在 initTable 实现,这是一个典型的 CAS 使用场景,volatile 的 sizeCtl 作为互斥手段,如果发现竞争性的初始化,就 spin 在那里,等待条件恢复;否则利用 CAS 设置排他标志。如果成功则初始化;否则重试。当 bin 为空时,同样是没有必要锁定,也是以 CAS 操作去放置。
这里同步用的是synchronized,不是 ReentrantLock,现代 JDK 中 synchronized 已经被不断优化,可以不再过分担心性能差异。相比于 ReentrantLock,它可以减少内存消耗,这是个非常大的优势。与此同时,更多细节实现通过使用 Unsafe 进行了优化,例如 tabAt 就是直接利用 getObjectAcquire,避免间接调用的开销。
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 利用CAS去进行无锁线程安全操作,如果bin是空的
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // 不加锁,进行检查
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
// 细粒度的同步修改操作...
}
}
// Bin超过阈值,进行树化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果发现冲突,进行spin等待
if ((sc = sizeCtl) < 0)
Thread.yield();
// CAS成功返回true,则进入真正的初始化逻辑
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
size 操作真正的逻辑是在 sumCount 方法中,思路和以前一样,都是分而治之计数,然后求和处理,但实现是用的 CounterCell。 它的数值更加准确吗?数据一致性是怎么保证的?CounterCell 是基于 java.util.concurrent.atomic.LongAdder 实现的,是 JVM 一种利用空间换取更高效率的方法,利用了Striped64内部的复杂逻辑。这个东西非常小众,大多数情况下还是建议使用 AtomicLong,足以满足绝大部分应用的性能需求。
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
Have Fun