并发安全的集合?
并发不安全的集合
在 Java 中,有一些集合是不安全的,因为它们不是线程安全的。这意味着如果多个线程同时访问这些集合,那么它们可能会出现不可预料的行为。
Java 中的并发不安全的集合包括:
ArrayList
LinkedList
HashSet
HashMap
LinkedHashSet
如果想在多线程环境中使用这些集合,则必须使用同步机制来保护它们,例如使用 Collections.synchronizedList
或 Collections.synchronizedSet
这些方法来包装这些集合。
Map<String,String> myMap=new HashMap<>();
Map<String, String> synchronizedMap = Collections.synchronizedMap(myMap);
复制代码
并发安全的集合
在 Java 中,同样有许多内置的并发安全的集合类可供使用,下面列举的这些类除了Vector
和Hashtable
都在 java.util.concurrent
包中定义。
下面是一些常用的并发安全的集合类:
-
Vector
-
HashTable
-
ConcurrentHashMap
线程安全的哈希表,可以在多个线程之间安全地进行读写操作。
-
CopyOnWriteArrayList
线程安全的动态数组,写入时会进行复制,因此读操作是非常高效的,但写操作会比较慢。
-
BlockingQueue
线程安全的阻塞队列,当队列为空时,试图从队列中获取元素的线程会被阻塞;当队列已满时,试图向队列中添加元素的线程也会被阻塞。
常用的实现包括
ArrayBlockingQueue
和LinkedBlockingQueue
。 -
ConcurrentLinkedQueue
线程安全的无界队列,它使用链接节点来实现队列,因此插入和删除操作非常高效。
这些并发安全的集合类可以使用 java.util.concurrent.locks
包中的锁和并发控制机制来进行更细粒度的同步。
注意:尽管这些类是线程安全的,但仍然建议使用同步机制来确保在多线程环境下的正确执行。
HashMap,HashTable存在的问题
我们这一章节谈论「问题」的前提是并发环境下,借由并发环境下这两个集合的问题,我们能理解为什么需要
ConcurrentHashMap
以及假如实现ConcurrentHashMap
需要怎样的设计
HashMap
线程不安全
HashMap
的线程不安全是由于它的实现方式造成的。HashMap
底层使用链表来存储映射关系,如果多个线程同时访问同一个 HashMap
,则可能会导致竞争条件,导致线程不安全。
例如,如果两个线程同时调用 HashMap
的 put
方法,则可能会导致其中一个线程的更新被覆盖,或者两个线程同时调用 remove
方法,导致某些条目被意外删除。
HashTalbe
线程安全,但是慢
Hashtable
之所以效率低下主要是因为其实现使用了synchronized
关键字对put
等操作进行加锁
而synchronized
关键字加锁是对整个对象进行加锁,也就是说在进行put
等修改Hash
表的操作时,锁住了整个Hash
表,从而使得其表现的效率低下。
ConcurrentHashMap 原理
锁分段技术
什么是锁分段技术?
HashTable
容器在竞争激烈的并发环境下效率低下,是因为所有访问HashTable
的线程都必须竞争同一把锁。
那么假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同的数据段的数据时,线程之间就不会存在锁竞争,从而有效地提高并发访问效率,这就是ConcurrentHashMap
所使用的锁分段技术。
也就是将数据分成一段一段的储存,然后给每一段数据配一把锁,当一个线程占用锁访问一个段数据时,其他的段数据也是可以被其他线程访问的。
另外,ConcurrentHashMap
是可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作时能够将锁的粒度保持尽量的小,不用对整个ConcurrentHashMap
加锁。
在理解了锁分段技术之后,我们看
ConcurrentHashMap
结构就会更加清晰但是需要注意的是锁分段技术是JDK1.7中的实现策略,在JDK1.8中使用
CAS
和synchronized
来加锁。事不宜迟,接着就到我们的源码环节!
ConcurrentHashMap 源码
ConcurrentHashMap
的源码在JDK1.7和JDK1.8有所不同,所以很多源码我们会分两个版本进行讨论。
ConcurrentHashMap
继承关系
ConcurrentHashMap 内部结构
-
JDK1.7
由图中可以看出,我们可以将整张
ConcurrentHashMap
划分成不同的段(Segment
),每个Segment
可以看做一个HashTable
,每个HashTable
使用不同的锁。final Segment<K,V>[] segments; 复制代码
简单来说
ConcurrentHashMap
是一个Segment
数组,Segment
通过继承ReentrantLock
来进行加锁,所以每次需要加锁的操作锁住的是一个segment
,这样只要保证每个Segment
是线程安全的,也就实现了全局的线程安全。static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // HashEntry数组 transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } // ... } 复制代码
Segment
内部维护了一个链表数组(上面的table
),也就是说一个entry
即链表中的一个实体。static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // ... } 复制代码
-
JDK1.8
在JDK1.7之前,
ConcurrentHashMap
是通过分段锁机制来实现的,所以其最大并发度受Segment
的个数限制。因此,在JDK1.8中,
ConcurrentHashMap
的实现原理摒弃了这种设计,而是选择了与HashMap
类似的数组+链表+红黑树的方式实现,而加锁则采用CAS
和synchronized
实现。transient volatile Node<K,V>[] table; 复制代码
注意到我们上面标注的有5种不同的节点类型,它们分别为:
Node
、TreeBin
、TreeNode
、ForwardingNode
、ReservationNode
对于
Node
、TreeNode
我们可以理解,毕竟一个是链表节点,一个是树节点。可是为什么红黑树的根节点是TreeBin
而不是TreeNode
,另外ForwardingNode
和ReservationNode
又是什么呢?接着我们依次解决这几个问题。
-
为什么用
TreeBin
,不用TreeNode
?对于红黑数的左旋和右旋的平衡操作,
ConcurrentHashMap
用TreeBin
充当代理来进行这些操作,而TreeNode
节点只有查找方法。 -
ForwardingNode
和ReservationNode
各是什么?-
ForwardingNode
在转换操作时插入到头部的一个节点,和
ConcurrentHashMap
的扩容,缩容有关系。 -
ReservationNode
起到一个占位的作用
-
-
初始化
-
JDK1.7
无参构造方法:
public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } 复制代码
无参构造方法调用了有参构造,传入三个常量参数
/** * 默认初始化容量 */ static final int DEFAULT_INITIAL_CAPACITY = 16; /** * 默认负载因子 */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * 默认并发级别 */ static final int DEFAULT_CONCURRENCY_LEVEL = 16; 复制代码
接着看被调用的有参构造:
@SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { // 参数校验 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 校验并发级别大小,大于 1<<16,重置为 65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 2的多少次方 int sshift = 0; int ssize = 1; // 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 记录段偏移量 this.segmentShift = 32 - sshift; // 记录段掩码 this.segmentMask = ssize - 1; // 设置容量 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // c = 容量 / ssize ,默认 16 / 16 = 1,这里是计算每个 Segment 中的类似于 HashMap 的容量 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; //Segment 中的类似于 HashMap 的容量至少是2或者2的倍数 while (cap < c) cap <<= 1; // 创建 Segment 数组,设置 segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } 复制代码
参数的含义如下:
-
initialCapacity
初始容量,这个值指的是整个
ConcurrentHashMap
的初始容量,实际操作的时候需要平均分给每个Segment
。 -
loadFactor
负载因子,我们知道
Segment
数组不可以扩容,所以这个负载因子是给每个Segment
内部使用的。 -
concurrencyLevel
可以叫成 并行级别、并发数、
Segment
数。默认是 16,也就是说
ConcurrentHashMap
有 16 个Segments
,所以理论上,在这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的Segment
上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
整体流程如下:
- 必要参数校验。
- 校验并发级别
concurrencyLevel
大小,如果大于最大值,重置为最大值。无参构造默认值是 16. - 寻找并发级别
concurrencyLevel
之上最近的 2 的幂次方值,作为初始化容量大小,默认是 16。 - 记录
segmentShift
偏移量,这个值为【容量 = 2 的N次方】中的 N,在后面 Put 时计算位置时会用到。默认是 32 - sshift = 28. - 记录
segmentMask
,默认是 ssize - 1 = 16 -1 = 15. - 初始化
segments[0]
,默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容。
初始化完成我们就获得了一个
Segment
数组。-
定位段的方法
这里的定位段的方法主要用的就是上面出现的两个量:
segmentShift
,segmentMask
为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。
当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中,后四位决定段中的坐标。
对应源码中的
segmentFor
方法:final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; } 复制代码
-
-
JDK1.8
初始化方法为
initTable
方法:private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。 if ((sc = sizeCtl) < 0) // 让出 CPU 使用权 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; } 复制代码
从源码中可以发现
ConcurrentHashMap
的初始化是通过自旋和CAS
操作完成的。里面需要注意的是变量sizeCtl
,它的值代表着table 初始化和扩容的状态标识:- -1: 说明正在初始化
- -N: 说明有N-1个线程正在进行扩容
>0
:数组初始化后的容量
- 0:默认初始值
put 方法
-
JDK1.7
源码如下:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); // hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算 // 其实也就是把高4位与segmentMask(1111)做与运算 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 如果查找到的 Segment 为空,初始化 s = ensureSegment(j); return s.put(key, hash, value, false); } private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k] // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了 Segment<K,V> proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的数组 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次检查一遍该槽是否被其他线程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } 复制代码
整体流程:
-
计算要
put
的 key 的位置,获取指定位置的Segment
。 -
如果指定位置的
Segment
为空,则初始化这个Segment
.-
初始化
Segment
流程(ensureSegment
方法流程)- 检查计算得到的位置的
Segment
是否为null. - 为 null 继续初始化,使用
Segment[0]
的容量和负载因子创建一个HashEntry
数组。 - 再次检查计算得到的指定位置的
Segment
是否为null. - 使用创建的
HashEntry
数组初始化这个Segment
. - 自旋判断计算得到的指定位置的
Segment
是否为null,使用CAS
在这个位置赋值为Segment
.
- 检查计算得到的位置的
-
-
Segment.put
插入 key,value 值。
接着来看
Segment.put
方法的源码:final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // 计算要put的数据位置 int index = (tab.length - 1) & hash; // CAS 获取 index 坐标的值 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { // 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // first 有值没说明 index 位置已经有值了,有冲突,链表头插法。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 容量大于扩容阀值,小于最大容量,进行扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } 复制代码
由于
Segment
继承了ReentrantLock
,所以Segment
内部可以很方便的获取锁,put
流程就用到了这个功能。tryLock()
获取锁,获取不到使用scanAndLockForPut
方法继续获取。
- 计算
put
的数据要放入的index
位置,然后获取这个位置上的HashEntry
。
-
遍历
put
新元素,为什么要遍历?因为这里获取的HashEntry
可能是一个空元素,也可能是链表已存在,所以要区别对待。-
如果这个位置上的
HashEntry
不存在:- 如果当前容量大于扩容阀值,小于最大容量,进行扩容。
- 直接头插法插入。
-
如果这个位置上的
HashEntry
存在:- 判断链表当前元素 key 和 hash 值是否和要
put
的 key 和 hash 值一致。一致则替换值
-
不一致,获取链表下一个节点,直到发现相同进行值替换,或者链表表里完毕没有相同的。
- 如果当前容量大于扩容阀值,小于最大容量,进行扩容。
- 直接链表头插法插入。
-
如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null.
- 判断链表当前元素 key 和 hash 值是否和要
-
我们再看到第一步中的
scanAndLockForPut
操作,它的含义是获取写入锁:private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 循环获取锁 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 进到这里说明数组该位置的链表是空的,没有任何元素 // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else // 顺着链表往下走 e = e.next; } // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁 // lock() 是阻塞方法,直到获取锁后返回 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头 // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法 (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } 复制代码
这个方法做的操作就是不断的自旋
tryLock()
获取锁。当自旋次数大于指定次数时,使用
lock()
阻塞获取锁。在自旋时顺便获取下 hash 位置的
HashEntry
。到此JDK1.7的
put
方法我们就分析完毕了。 -
-
JDK1.8
put
方法源码如下:public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // key 和 value 不能为空 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { // f = 目标位置元素 Node<K,V> f; int n, i, fh;// fh 后面存放目标位置的元素 hash 值 if (tab == null || (n = tab.length) == 0) // 数组桶为空,初始化数组桶(自旋+CAS) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出 if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 使用 synchronized 加锁加入节点 synchronized (f) { if (tabAt(tab, i) == f) { // 说明是链表 if (fh >= 0) { binCount = 1; // 循环加入新的或者覆盖节点 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 红黑树 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } 复制代码
- 根据 key 计算出 hashcode 。
- 判断是否需要进行初始化。
- 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用
CAS
尝试写入,失败则自旋保证成功。 - 如果当前位置的
hashcode == MOVED == -1
,则需要进行扩容。 - 如果都不满足,则利用
synchronized
锁写入数据。 - 如果数量大于
TREEIFY_THRESHOLD==8
则要执行树化方法,在treeifyBin
中会首先判断当前数组长度≥64时才会将链表转换为红黑树。
get 方法
-
JDK1.7
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 计算得到 key 的存放位置 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { // 如果是链表,遍历查找到相同 key 的 value。 K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; } 复制代码
- 计算得到 key 的存放位置。
- 遍历指定位置查找相同 key 的 value 值。
-
JDK1.8
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // key 所在的 hash 位置 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果指定位置元素存在,头结点hash值相同 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) // key hash 值相等,key值相同,直接返回元素 value return e.val; } else if (eh < 0) // 头结点hash值小于0,说明正在扩容或者是红黑树,find查找 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 是链表,遍历查找 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } 复制代码
- 根据 hash 值计算位置。
- 查找到指定位置,如果头节点就是要找的,直接返回它的 value.
- 如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,查找之。
- 如果是链表,遍历查找之。
扩容方法
rehash 方法(JDK1.7)
只有JDK1.7有这个
rehash
方法
我们首先回顾一下触发扩容的位置,put
的时候,如果判断该值的插入会导致该 segment
的元素个数超过阈值,那么先进行扩容,再插值。
然后需要注意的是segment
数组不能扩容,扩容是 segment
数组某个位置内部的数组 HashEntry<K,V>[]
进行扩容,扩容后,容量为原来的 2 倍。
老数组里的数据移动到新的数组时,位置要么不变,要么变为 index+ oldSize
,参数里的 node 会在扩容之后使用链表头插法插入到指定位置。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
// 老容量
int oldCapacity = oldTable.length;
// 新容量,扩大两倍
int newCapacity = oldCapacity << 1;
// 新的扩容阀值
threshold = (int)(newCapacity * loadFactor);
// 创建新的数组
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,默认2扩容后是4,-1是3,二进制就是11。
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 遍历老数组
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算新的位置,新的位置只可能是不便或者是老的位置+老的容量。
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 如果当前位置还不是链表,只是一个元素,直接赋值
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 如果是链表了
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 新的位置只可能是不便或者是老的位置+老的容量。
// 遍历结束后,lastRun 后面的元素位置都是相同的
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// ,lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
// 遍历剩余元素,头插法到指定 k 位置。
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 头插法插入新的节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
复制代码
对于最后的两个 for
循环的含义:
- 第一个
for
是为了寻找这样一个节点,这个节点后面的所有next
节点的新位置都是相同的。然后把这个作为一个链表赋值到新位置。
- 第二个
for
循环是为了把剩余的元素通过头插法插入到指定位置链表。
tryPresize 方法(JDK1.8)
只有JDK1.8中有
tryPresize
方法
这个方法要看懂还要看后面的transfer
方法。
这里的扩容后数组容量是原来的2倍。
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {
// c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 我没看懂 rs 的真正含义是什么,不过也关系不大
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
// 此时 nextTab 不为 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
// 我是没看懂这个值真正的意义是什么? 不过可以计算出来的是,结果是一个比较大的负数
// 调用 transfer 方法,此时 nextTab 参数为 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
复制代码
这个方法的核心在于 sizeCtl
值的操作,首先将其设置为一个负数
然后执行 transfer(tab, null)
,再下一个循环将 sizeCtl
加 1,并执行 transfer(tab, nt)
,之后可能是继续 sizeCtl
加 1,并执行 transfer(tab, nt)
。
所以,可能的操作就是执行 1 次 transfer(tab, null)
+ 多次 transfer(tab, nt)
,这里怎么结束循环的需要看完 transfer
源码才清楚。
transfer
源码
这个方法完成的工作是「数据迁移」。将原来的 tab
数组的元素迁移到新的 nextTab
数组中。
该方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab
参数为 null,之后再调用此方法的时候,nextTab
不会为 null。
transferIndex
配合步长(stride
)用于安排哪个线程执行哪几个任务。
第一个发起数据迁移的线程会将 transferIndex
指向原数组最后的位置,然后从后往前的 stride
个任务属于第一个线程
然后将 transferIndex
指向新的位置,再往前的 stride
个任务属于第二个线程。
依此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程。
其实就是将一个大的迁移任务分为了一个个任务包
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
// 之后参与迁移的线程调用此方法时,nextTab 不会为 null
if (nextTab == null) {
try {
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable 是 ConcurrentHashMap 中的属性
nextTable = nextTab;
// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode 翻译过来就是正在被迁移的 Node
// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
// 所以它其实相当于是一个标志。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
*/
// i 是位置索引,bound 是边界,注意是从后往前
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 下面这个 while 真的是不好理解
// advance 为 true 表示可以进行下一个位置的迁移了
// 简单理解结局: i 指向了 transferIndex,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 所有的迁移操作已经完成
nextTable = null;
// 将新的 nextTab 赋值给 table 属性,完成迁移
table = nextTab;
// 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 头节点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0) {
// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
// 需要将链表一分为二,
// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
// lastRun 之前的节点需要进行克隆,然后分到两个链表中
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果一分为二后,节点数小于等于6,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将 ln 放置在新数组的位置 i
setTabAt(nextTab, i, ln);
// 将 hn 放置在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
}
}
}
}
}
复制代码
所以 transfer
这个方法并没有实现所有的迁移任务,每次调用这个方法只实现了 transferIndex
往前 stride
个位置的迁移工作,其他的需要由外围来控制。
treeifyBin 树化(JDK1.8)
JDK1.8中
ConcurrentHashMap
引入了红黑树,因此只有JDK1.8有这个方法
同样的在put
方法中被触发,在同一个节点的个数超过8个的时候,会调用treeifyBin
方法。
treeifyBin
不一定就会进行红黑树转换,也可能是仅仅做数组扩容。
当数组长度小于64的时候会优先扩充数组。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 为 64
// 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 后面我们再详细分析这个方法
tryPresize(n << 1);
// b 是头节点
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 加锁
synchronized (b) {
if (tabAt(tab, index) == b) {
// 下面就是遍历链表,建立一颗红黑树
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 将红黑树设置到数组相应位置中
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
复制代码