个人对哈希数据结构学习总结 -- 实践篇 -- 上
- 引言
- 最佳实践
- Java篇
- HashMap
- get
- put
- 扩容
- ConcurrentHashMap
- get
- put
- 扩容
- 协作扩容
- 读为什么可以不加锁?
- ThreadLocalMap
- get
- put
- 扩容
- delete
- 为什么遍历到null桶就可以判断key不存在?
- ThreadLocalMap为什么不需要锁?
- 为什么ThreadLocalMap要采用线性探测法?
- 小结
引言
哈希表这个数据结构相信各位都不陌生,无论是高级语言,还是各大数据库底层实现都不离开它,所以本文我想来聊聊我个人对哈希表的一些看法,同时也是对哈希表这个知识点做一次系统性的梳理和总结。
本文主要会分为两大部分:
- 哈希表设计思考(偏理论)
- 最佳实践(结合Java,Go,redis等热门技术来谈谈其中使用到的哈希设计原则)
最佳实践
Java篇
Java部分数据结构分析均基于JDK 11进行讲解
HashMap
我们先来思考一下,如果让我们来设计一个哈希数据结构,我们需要使用哪些属性记录哪些信息呢?
首先,我们要清楚jdk 1.8中hashmap的整体设计思路如下图所示:
hashMap使用链式哈希(Chained Hashing)来解决哈希冲突问题,并且考虑到哈希冲突密集场景下链表过长影响读写性能,使用红黑树替换链表 , 同时在delete元素的恰当时机执行退树操作,节省内存占用情况。
下一步就是思考一个hashmap需要对外提供哪些核心访问接口呢?
- get
- put
get
那么第一步我们先来看看hashmap的get方法时如何实现的:
public V get(Object key) {
Node<K,V> e;
// 如何计算一个key的hash值呢?
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
上节讲过,一个设计优秀的哈希表需要在冲突避免和冲突解决阶段都进行充分考虑,而为了减少哈希冲突发生的可能性,一个设计良好的hash函数必不可少,hashmap中提供的hash函数实现较为简单:
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
- 通过将哈希值高位右移并与原本的哈希值进行异或,从而实现高位的信息混入到低位,以增加哈希值的随机性和分布性,减少哈希碰撞的发生
- 由于哈希表的大小为2的幂次,这意味着 (tableSize - 1) 的二进制表示中低位都是1,而高位都是0。当进行哈希值与 (tableSize - 1) 的位与(AND)操作时,低位的1会决定桶的索引,因此低位对于决定桶的位置更为关键
- 这也是为什么在哈希函数中,常常会通过位运算来将高位的信息混入低位,从而减少哈希碰撞,提高哈希表的分布性。通过在哈希函数中将高位信息混入低位,可以使不同键的哈希值更有可能落在不同的桶中,减少了碰撞的概率
此处hash技巧的适用前提是建立在桶数组大小为2的n次幂前提之下的
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if (
//1. 桶数组已经初始化过了
(tab = table) != null
//2. 桶数组不为空
&& (n = tab.length) > 0 &&
//3. 借助2的n次幂这个特性,使用与运算替代取模运算,计算出当前key所在的桶下标
(first = tab[(n - 1) & hash]) != null) {
//4. 检查桶对应链接的头结点是否是目标key,如果是直接返回
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
//5. 接着判断当前桶关联的是链表还是红黑树
if ((e = first.next) != null) {
//6. 红黑树如何定位这里就不多展开了,可以直接简化为在二叉查找树种定位node的过程
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
//7. 如果关联的是链表,那么就遍历一遍链表,依次对比
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
- Doug Lea喜欢把很多赋值逻辑直接在if语句中完成
- 首节点被提取出单独判断,是因为首节点往下判断的方式取悦于当前桶关联的是链表还是红黑树,但是访问首节点与当前桶关联的数据结构无关。
get的过程还是非常easy的,面试过程中核心考察的主要是以下两个点:
- hash function 是如何实现的,为什么能这样实现 ? —> 一切的根源在于桶数组大小是2的n次幂这个特性
- 整个get的过程
put
put的实现也很简单,下面来看看是如何实现的吧:
public V put(K key, V value) {
// hash function是如何工作的,上面已经讲过了,这里不再重复
return putVal(hash(key), key, value, false, true);
}
put的过程需要考虑以下几点:
- 首先确认要插入的key是否存在于当前桶中,如果桶关联的是链表那么遍历链表,如果关联红黑树则遍历红黑树进行查找
- 如果不存在,那么对于链表来说就是简单的尾插法,红黑树这里不展开,同时在链表插入完后,需要检查是否需要进行树化操作
- 如果存在,则执行更新操作,这里hashmap单独提供一个onlyIfAbsent标志来控制是否只期望执行插入操作,如果该标志位true,则会忽略更新步骤
- 如果执行的是插入操作,则再插入完成后 ,进入最后的扩容逻辑判断
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// 1. hashmap的桶数组是懒初始化的
if ((tab = table) == null || (n = tab.length) == 0)
// 数组懒初始化是调用的resize方法,和扩容调用的是一个方法
n = (tab = resize()).length;
// 2. 通过与预算替代取模运算,计算出当前键值对所在的桶,同时检查当前桶是否为空,如果为空直接插入即可
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
// 3. 如果桶不为空,此时要检验key是否已经存在,是否需要扩容,如果不存在,需要插入的话,桶关联的是链表还是红黑树
else {
Node<K,V> e; K k;
// 4. 和get过程一样,首先判断当前桶关联的首节点是否等于当前key
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 5. 如果首节点不是要找的key,在根据红黑树和链表单独分开处理
else if (p instanceof TreeNode)
// 往红黑树中插入元素的过程这里不展开,便于理解,大家可以简化为二叉查找树的插入和更新过程
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
// 6. 如果当前桶关联的是链表,那么遍历链表 -- 注意: e保存的就是我们最终找到的那个node节点
for (int binCount = 0; ; ++binCount) {
// 说明链表遍历结束了,没有找到与当前key匹配的key,说明执行插入而非更新操作
if ((e = p.next) == null) {
// 先插入到链表尾部
p.next = newNode(hash, key, value, null);
// 检查链表长度是否大于了树化阈值 -- 这里是8
// 减去1,是因为我们这里遍历链表是从链表首节点后一个节点开始进行的遍历
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
// 如果达到了树化阈值,则把链表转换为红黑树
treeifyBin(tab, hash);
break;
}
// 如果链表中存在于当前key匹配的key,则跳过循环,然后执行更新操作
// 注意: 只有执行更新操作时,e才会被赋值为目标node,执行插入操作的时候e=null
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 7. 如果执行的是更新操作,则e不为null
if (e != null) { // existing mapping for key
V oldValue = e.value;
// 如果onlyIfAbsent标识为true,说明我们期望执行插入操作,而非更新操作
// 如果没有onlyIfAbsent限制,或者onlyIfAbsent=false,则只需更新操作
if (!onlyIfAbsent || oldValue == null)
e.value = value;
// 回调接口 --- LinkedHashMap(哈希链表结构)会用到该接口实现排序,从而实现LRU效果
afterNodeAccess(e);
// 注意: 如果执行更新操作,这里会直接返回,跳过扩容判断
return oldValue;
}
}
// 8. 增加修改次数 -- modCount用于在迭代器遍历中检查集合是否被修改了
++modCount;
// 9. 判断是否需要扩容
if (++size > threshold)
resize();
// 还是回调接口 -- LinkedHashMap会利用该接口完成LRU淘汰相关操作
afterNodeInsertion(evict);
return null;
}
put的过程值得我们关注的有以下几个点:
- 如果执行链表尾插操作,插入完后会进行树化检查
- 如果执行更新操作,执行完后就直接return旧值返回了,如果执行插入操作,则会在插入完后进行扩容检查
- 两个回调接口的调用,hashmap本身并没有用到这两个回调接口,但是它的子类LinkedHashMap可以利用这些回调接口完成LRU的功能
关于淘汰算法LRU也是一个重点,大家感兴趣可以了解一下,leetcode上也有相关题目可以练习LRU的实现。
扩容
hashmap的resize扩容方法逻辑还是有点小复杂的,但是总体还是比较好理解的,整个过程分为两大阶段:
- 计算扩容后新的桶数组的容量,计算新的扩容阈值大小,毕竟桶数组变大了,对于的扩容阈值也需要变大
- 节点迁移,依次处理旧的桶数组中每个桶,根据桶只有一个元素,还是关联红黑树,或者关联链表三种场景进行分类处理,迁移过程中充分利用了桶数组容量为2的n次幂这个特性
由于resize函数在桶数组懒初始化和扩容两个时机处都会被调用,所以计算上述两个值的过程涉及对数组懒初始化过程的处理,这一点大家需要注意。
final Node<K,V>[] resize() {
//===========================阶段1===============================
Node<K,V>[] oldTab = table;
// 保存旧的桶数组大小和扩容阈值
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
// 用于保存新的桶数组大小和扩容阈值
int newCap, newThr = 0;
// 1. 数组懒初始化和扩容调用的都是resize方法,所以这里首先判断走的是哪个流程
if (oldCap > 0) {
// 2. 如果走的是扩容流程,那么先进行边界检查,如果桶数组容量超过了最大限制,则直接返回旧的桶数组
if (oldCap >= MAXIMUM_CAPACITY) {
// 扩容阈值设置为无限大,这样就永远不会再进入扩容流程了
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 3. 扩容是直接扩容一倍大小,这里就是先检查扩容一倍大小后是否会超过最大限制
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
// 3.1 如果我们通过hashmap构造函数指定的桶数组大小比默认初始化大小还小,那么这里计算新的扩容阈值不会直接翻倍处理
oldCap >= DEFAULT_INITIAL_CAPACITY)
// 新的扩容阈值为原来的一倍
newThr = oldThr << 1; // double threshold
}
// 4. 如果走的是数组懒初始化流程,那么判断threshold是否已经设置过了
else if (oldThr > 0) // initial capacity was placed in threshold
// 如果走的初始化流程并且threshold已经被设置过了,那么初始的数组容量就等于设置的扩容阈值
newCap = oldThr;
// 5. 走的懒初始化流程,并且threshold还没有设置
else { // zero initial threshold signifies using defaults
// 那么初始容量和扩容阈值都走默认值
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
// 6. 3.1和4场景下,会根据newCap计算新的扩容阈值
if (newThr == 0) {
// 新的扩容阈值 = 当前新数组的容量大小 * 负载因子
float ft = (float)newCap * loadFactor;
// 新的扩容阈值正常情况下为上面公式求解出来的值,如果超过最大值限制,则设置为无穷大
newThr = (
// 如果扩容后新数组大小比允许的最大值还大,那么设置新的扩容阈值为无穷大,也就是后面不会再进入扩容流程了
newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
//===========================阶段2===============================
// 上面一堆if...else都是计算新的扩容阈值和扩容后新数组的大小
threshold = newThr;
// 初始化一个新的数组,大小就是上面求解出来的新数组大小
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
// 下面就是将数据从旧的桶数组依次搬移到新的桶数组中
if (oldTab != null) {
// 依次处理每一个桶
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
// 每个桶的首元素也是单独处理 -- 此处先判断首元素是否为空
// 如果首元素不为空,再进行数据迁移,否则为空了就无需迁移了
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
// 如果当前桶只有一个首元素,那么直接重新计算当前首元素所在的桶下标,然后设置到新桶即可
if (e.next == null)
// 这里计算元素在新桶中下标的方式还是挺有意思的,利用了2的n次幂特性
// 可以直接利用与运算求解出元素在新桶中的下标
newTab[e.hash & (newCap - 1)] = e;
// 如果当前桶不止一个元素,那么根据当前桶关联的是红黑树还是链表分别处理
else if (e instanceof TreeNode)
// 如果当前桶关联的是一个红黑树,如何处理,这部分内容大家感兴趣可以自己研究一下
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
// 当前桶关联的是一个链表,那么迁移过程也就是遍历链表
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
// 这里迁移过程比较有意思,简单聊聊
// 1. 首先维护两个链表loHead 和 hiHead
// 2. loHead链表保存哈希值 & OldCap == 0的node节点 , hiHead链表保存与运算值为1的节点
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 3. 对于计算得到与运算结果为0的这条链表,将这条链表直接挂载到新的桶数组第j个桶,也就是和之前桶下标一致
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 4. 对于计算得到与运算结果为1的这条链表,将这条链表直接挂载到新的桶数组第j+oldCap个桶
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
扩容流程整体分为两个阶段:
1,计算新的桶数组大小和新的扩容阈值 , 该阶段的整体流程如下:
- 如果走的是扩容逻辑, 先判断是否越界,如果越界设置扩容阈值为无穷大,这样后面就不会再进入扩容流程了;
- 如果走的扩容逻辑,并且没越界,那么newCap为oldCap的一倍,但是如果oldCap <= 16(DEFAULT_INITIAL_CAPACITY),那么newThr = newCap * loadFactor ,否则为旧扩容阈值的一倍。
为什么oldCap<=16的时候,新的扩容阈值计算逻辑要区分开来呢?而不是说新的扩容阈值 = newCap * loadFactor呢?
16 是hashmap默认的桶数组初始化大小,我猜测可能是因为小于16的范围内 newThr = newCap * loadFactor = oldThr << 1
这套恒等式存在不成立的情况,所以拎出来单独进行处理,如:
loadFactor = 0.75
oldCap = 2 , oldThr = 1 , newCap = 4
newCap * 0.75 = 3
oldThr << 1 = 2
很显然此时恒等式不相等
这里只是说存在恒等式不成立的情况,不是说<=16的时候,这套恒等式就不成立,如果大家推演后,会发现当cap从4开始的时候,恒等式就已经开始成立了。
在oldCap > 16的情况下 ,newThr = newCap * loadFactor = oldThr << 1
这套恒等式都是成立的,所以这里也是再次使用位移优化了乘法运算,只能说真的绝!
- 如果走的是桶数组懒初始化逻辑,根据我们选择的hashmap构造函数的不同,会走入两条路径,一条是我们选择了如下的构造函数,该构造函数中计算好了cap, 但是threshold还没有计算
(不要被这里赋值给threshold迷惑了,这里threshold实际保存的是newCap的值,threshold的值会在resize后面被赋值为newCap * loadFactor)
public HashMap(int initialCapacity, float loadFactor) {
...
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
static final int tableSizeFor(int cap) {
int n = -1 >>> Integer.numberOfLeadingZeros(cap - 1);
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
- 另一种就是选择了空参构造函数,该构造函数中没有提前计算出cap和threshold,此时resize函数中都会赋予默认值
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}
2, 数据迁移遍历旧桶数组中每个桶,然后根据桶中只有一个元素,桶关联链表,桶关联红黑树三种情况进行分别迁移处理,具体迁移过程大家可以参考上面的代码注释理解,重点关注迁移过程中是如何利用到了桶数组2的n次幂这个特性就可以了
ConcurrentHashMap
HashMap本身是线程不安全的,这一点相信各位都清楚,而ConcurrentHashMap这是HashMap的线程安全版本,本节我们就来看看jdk 8之后的ConcurrentHashMap是如何实现的。
ConcurrentHashMap实现的讲解还是分为三个阶段:
- get
- put
- 扩容
在正式进入源码讲解前,我们先来看看这三个阶段都会涉及到的动作:
- 计算key的哈希值 — spread函数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
和hashmap的hash方法一样,通过异或操作将高位的信息混合到低位中,增加了哈希值的随机性。但是和hashmap不同之处在于,此处多了一个&0x7fffffff的操作,这又是干啥的呢?
- HASH_BITS 是一个常量,它的值是 0x7fffffff,表示一个非负整数的最高位是0,其余位都是1
0111111111111111111111111111111
这是一个用来掩码操作的值,它可以确保结果是非负的,并且在进行桶索引计算时,不会超出哈希表的范围。
get
ConcurrentHashMap的get流程和HashMap基本一致,并且整个get的过程是没有加锁的:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的哈希值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
// 先获取key所在桶
(e = tabAt(tab, (n - 1) & h)) != null) {
// 单独处理桶的首元素
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则说明当前桶关联着链表,那么遍历链表,挨个对比
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
既然ConcurrentHashMap的get方法没有加锁,那么多线程情况下又是如何确保数据读取一致性的呢?
- 这个问题等我们看完ConcurrentHashMap的put写入方法后,再来细说
put
从get方法的实现可以看出,ConcurrentHashMap的读过程是不加锁的,那么put写入方法需不需要加锁呢?如果需要加锁,这锁又是如何加上的呢?
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 1.计算得到当前key的哈希值
int hash = spread(key.hashCode());
int binCount = 0;
// 2. 死循环 -- 便于桶为空时,不断循环cas设置桶中第一个键值对
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// 3. 桶数组还没有初始化,先执行初始化流程
if (tab == null || (n = tab.length) == 0)
// 初始化过程也是不断cas重试 -- 当然是内部存在一个while循环不断cas尝试重试
// 直到当前线程把桶数组初始化完毕,或者其他线程完成了初始化工作
tab = initTable();
// 4. 定位当前key所在桶,同时判断桶中首元素是否为null -- 首元素单独处理
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果桶为空,那么直接将当前键值对设置进桶里即可
// 这里采用CAS尝试进行设置,成功了则break跳出循环,否则不断循环尝试cas
// 直到自己设置成功,或者其他线程设置成功
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
// 5. 如果当前哈希表处在扩容阶段,那么当前线程会加入帮忙完成剩余扩容流程
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 6. 如果首元素与当前key匹配,并且我们要求执行插入操作,那么这里可以直接中断返回了
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
// 7. 首元素不匹配并且当前哈希表不处于扩容阶段,那么根据当前桶关联的是链表还是红黑树,走不同的插入更新逻辑
else {
V oldVal = null;
// 锁住当前桶所在的头结点
synchronized (f) {
// 需要确保此时的头结点没有因为扩容而移走
// 确保一开始拿到的当前桶的头结点和当前桶的头结点仍然是一个
if (tabAt(tab, i) == f) {
// 头结点的哈希值大于等于0,说明当前桶上挂载的是一个链表
if (fh >= 0) {
// 遍历链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 判断key是否已经存在于链表中
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果key存在,那么判断当前是否只期望发生插入操作
// 如果是,则忽略更新步骤,否则执行更新val的过程
if (!onlyIfAbsent)
e.val = value;
// 如果发现了重复key,则再完成更新后跳出循环
break;
}
// 如果链表遍历结束了,也没发现存在重复key
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 将键值对尾插到当前链表中
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
// 如果当前桶挂载的是一个红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 向红黑树中插入当前键值对 -- 返回旧的值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
// 如果返回的旧值不为nil,说明当前key在红黑树中存在
// 那么是否执行更新操作,还需要根据OnlyIfAbsent标志进行判断
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
// 如果当前桶挂载的是链表,判断是否需要进行树化
// 如果执行的是更新操作,则直接return
if (binCount != 0) {
// 只有当前桶挂载的是链表并且执行的是插入操作时,binCount才有可能会大于等于树化阈值
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果执行的是更新操作,oldVal不为nil,则直接return返回
if (oldVal != null)
return oldVal;
break;
}
}
}
// 8. 执行完插入操作后,判断是否需要扩容
addCount(1L, binCount);
return null;
}
在整个put流程中,用到了两个并发控制技巧:
- 当桶数组还未初始化时,和设置桶中第一个元素时,都采用不断cas重试完成,而没有加锁
- 当尝试往桶装载的链表或者红黑树中执行插入或者更新操作时,则采用synchronized锁住当前桶头节点的方式完成临界区资源的保护
那么下面我们再来看看判断是否需要扩容的流程又是如何实现的呢?简单来说没整体流程如下:
- 累加count计数器
- 判断是否需要扩容
如何在并发场景下完成计数器的累加操作呢?加锁 or cas or 其他方式,ConcurrentHashMap这里的实现方式还是非常值得我们学习的:
private final void addCount(long x, int check) {
//==================累加count计数器=====================
CounterCell[] cs; long b, s;
// cells数组会在第一次发生竞争时被懒初始化
if ((cs = counterCells) != null ||
// 如果到目前为止未发生竞争,则cells数组为空,此时直接把计数累加到baseCount上
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
// 如果cells数组被初始化了,或者说cas设置baseCount值时发生了竞争,cas失败的线程进入该段逻辑
if (
// cells数组还没有初始化则进入fllAddCount函数,完成cells数组的初始化逻辑,同时包括累加计数到cell的过程
cs == null || (m = cs.length - 1) < 0 ||
// 如果当前线程所要累加的cell还未初始化,同样进入fullAddCount函数,完成初始化和累加逻辑
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
// 如果当前线程所要累加的cell初始化了,但是cas尝试累加上去的过程中又发生了竞争,则进入fullAddCount逻辑,通过不断cas循环完成累加
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
// 核心就是不断cas循环,完成累加
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 累加计数已经成功,下面就是计算当前count总和
s = sumCount();
}
//==========判断是否需要扩容=====================
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 扩容过程也会产生竞争,因此依然需要while循环重试,直到扩容工作完成
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// n此时被赋值为了当前桶数组大小
int rs = resizeStamp(n);
// sc=sizeCtl,如果sizeCtl < 0 ,则表示当前哈希表正在初始化或者扩容中
if (sc < 0) {
// 判断是否需要帮忙扩容,例如: 参与帮忙扩容线程过多的时候,则无需再帮忙了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果当前哈希表正在扩容,则帮忙扩容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 尝试对哈希表进行扩容
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
fullAddCount函数的实现主要分为以下几个阶段:
- 获取当前线程的hashcode
- 如果当前cells数组还没有初始化,则先初始化cells数组,如果cells数组初始化过程也有竞争问题,则将计数累加到baseCount , 如果两个过程都产生了竞争,则进入下一轮的cas重试 (外层还有一个死循环套着呢)
- 如果当前线程所要累加的cell数组还未初始化,则执行初始化流程
- 既然当前线程所要累加的cell已经初始化完毕了,则通过cas配合重试完成累加过程,这个过程中如果有机会,也会判断是否需要对cell数组进行扩容
整个计数器累加阶段运用到了很多并发设计思想,值得我们学习的有以下几个:
- 采用分治思想,进行分段累加,然后合并汇总,减少了竞争
- cas配合重试实现计数器的原子累加
- 单变量配合cas加重试实现一个简单的锁
扩容
讲解ConcurrentHashMap扩容源码前,各位不妨先思考一下hashmap的扩容流程在并发场景下存在哪些问题?又该如何解决这些问题呢?
- hashmap的扩容分为两个阶段: 计算newCap和newThr 和 数据迁移
- 这里并发竞争的核心在于数据迁移,因为ConcurrentHashMap支持多个线程共同完成迁移,那么这个共同协作的过程又是如何实现的呢?
多线程共同完成数据迁移用到的其实还是分而治之的思想,如下图所示:
每个线程每次会分配到一个固定大小的迁移区间,然后当处理完本次迁移区间内的所有数据迁移任务后,会进入下一轮循环,如果还有待迁移数据,则会再次分配给当前线程一批待迁移区间。
通过分而治之处理的思想,实现多线程共同协作迁移数据的过程,具体代码如下所示,总体而言分为三个阶段:
- 每轮循环,分配给当前线程一个待迁移区间
- 如果全部数据迁移完毕了,则结束迁移任务
- 否则当前线程负责完成分配给自己的迁移区间的迁移任务
- 当前线程一次迁移一个桶,迁移完后递减 i 指针,判断 i >= bound, 如果条件满足说明当前迁移区间还有桶没有处理,则继续处理
- 否则说明分配给当前线程的迁移区间已经处理完毕了,则进入下一个分配循环
- 然后重复此过程,直到所有数据迁移完毕
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 因为涉及多线程共同帮忙扩容,因此每个线程有自己固定的迁移步长
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果传入的新的桶数组为空,首先初始化新的桶数组
if (nextTab == null) { // initiating
try {
// 新的桶数组大小为原来的一倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 数据迁移是从后往前进行迁移,transferIndex表示当前待迁移的起始下标
transferIndex = n;
}
// 获取新的桶数组大小
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 如果还有待迁移数据,那么当前线程每轮循环都会分配到一个迁移区间,当前线程处理完这批数据后
// 进入下一轮循环,判断是否还有没迁移完的数据,如果有则再分配给它一个待迁移区间,然后重复此过程
// 直到全部数据迁移完毕
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// ========== 1. 每轮循环,分配给当前线程一个待迁移区间 ===============
// 如果找到一个待迁移区间,则通过cas不断重试尝试更新tansferIndex的值
while (advance) {
// nextIndex: 下一个待迁移的区间起始下标
// nextBound: 当前线程负责迁移区间的起点下标
int nextIndex, nextBound;
// 如果已经分配给当前线程一个迁移区间,此处--i是去迁移下一个桶,因为线程是一次迁移一个桶
// 如果--i < bound,说明当前线程已经处理完毕了分配给自己的迁移区间,那么进入下面分配新的迁移区间的逻辑
if (--i >= bound ||
// finishing标识设置了,说明所有数据都迁移完毕了
finishing)
advance = false;
// 如果所有桶都完成了数据迁移,则结束迁移工作
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 如果还没有迁移完,那么计算下一个待迁移的区间,交给当前线程来迁移
// 如果存在多个线程同时计算自己的待迁移区间,由于设置对共享遍历transferIndex的修改,
//所以需要使用cas配合外层循环不断重试完成
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
// 如果剩余待迁移区域大小不足stride,说明没有固定的迁移步长
// 那么剩余待迁移数据都交给当前线程负责迁移
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// bound表示当前线程负责迁移数据的起始下标
bound = nextBound;
// i表示待迁移区间的结束下标
i = nextIndex - 1;
advance = false;
}
}
// ============= 2. 如果全部数据迁移完毕了,则进入下面这段逻辑 =============
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果数据都迁移完毕了,则设置table指向新的桶数组,然后返回
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 设置迁移结束标识
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// =============3.当前线程负责完成分配给自己的迁移区间的迁移任务==========
// 迁移过程中可能会遇到如下三个场景
// 1. 当前桶本身就没有节点,此时将当前桶的头结点设置为forwardingNode,表示当前桶处理完毕
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 2. 如果当前迁移区间已经处理过了,则跳过处理进入下一轮迁移区间分配
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 3. 当前桶有数据需要迁移,进入真正的数据迁移阶段
else {
// 要迁移当前桶内数据,会锁住当前桶的头结点
synchronized (f) {
// 迁移方向是从迁移区间的最右边往最左边进行迁移处理,因此先定位最右边的桶
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh大于0,说明当前桶上挂载的是链表,处理链表到对于新桶的数据迁移工作
if (fh >= 0) {
// 对于链表的迁移过程,和hashmap思想类似
// 将链表分为两条: 一条链表上链接的节点在新的桶数组下标和旧的桶数组下标一致
// 另一部分链表上的节点在新的桶数组下标=旧桶数组下标+n
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 一条链表在新桶的下标位置和旧桶一致
setTabAt(nextTab, i, ln);
// 另一条链表在新桶的下标位置=旧桶下标+n
setTabAt(nextTab, i + n, hn);
// 当前桶迁移完毕,将当前桶的头结点设置为forwardingNode
setTabAt(tab, i, fwd);
// 当前线程已经处理完了本次分配自己的迁移区间的迁移任务,设置advance标识为true
// 进入下一轮迁移区间分配
advance = true;
}
// 如果当前桶挂载的是红黑树,如果完成迁移工作,这里不多展开
else if (f instanceof TreeBin) {
....
}
}
}
}
}
}
扩容逻辑这段代码,我们学习到什么并发设计技巧呢?
- 核心还是cas加重试,实现对关键共享标识的设置,这里的关键共享标识就是transferIndex
- 线程单次迁移一个桶,当前线程处理某个桶的迁移工作时,会锁住当前桶的头结点,当前桶迁移完毕后,会桶的头结点设置为forwardingNode
为什么都是对共享资源的保护,一个采用cas+重试,一个采用加锁呢?
- 关键看对共享资源锁时间的长短,如果较长的话,那么cas大部分时间都在空转消耗CPU,不如把对应线程挂起等待,让出CPU资源
协作扩容
上面我们看了扩容部分的源码,总体而来,有点小复杂,除了该扩容函数之外,其他函数的实现还是很容易理解,下面我们就来看看协作扩容的方法源码是如何实现的吧。
helpTransfer方法的第二个参数是正在执行扩容的桶的头结点,如putVal方法中如下这段源码:
...
// hash(key) & cap后发现所在桶正在执行扩容逻辑
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...
helpTransfer方法实现比较简单,但是正式加入扩容小分队前,需要再三确认扩容还在继续,如果扩容过程已经结束了,则直接返回:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 确保扩容还未结束
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 这里重点关注transferIndex,如果transferIndex<=0说明全部数据都完成了迁移操作
// 那么这里可以跳出循环,然后返回新的桶数组
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果扩容还在继续,则加入帮忙扩容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
读为什么可以不加锁?
ConcurrentHashMap中对共享数据结构或者共享变量很多读操作都没有加锁:
- get方法没有加锁
- sumCount累加计数方法中读取计数器的过程中也没有加锁
final long sumCount() {
CounterCell[] cs = counterCells;
long sum = baseCount;
if (cs != null) {
for (CounterCell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
大家思考一下,为什么可以不加锁呢?或者反向思考一下,同时存在对一个共享变量的读写操作会发生什么问题呢?
我们都应该学习过计组,计组课程中一定讲过CPU的多级缓存体系,通常每个CPU的L1,L2缓存是不共享的,L3缓存是共享的,通过缓存最长使用的数据,减少从主存读写数据的次数,从而避免CPU花费较多时钟周期等待从主存读写数据。
缓存的好处很多,弊端就是会带来缓存一致性问题,如果CPU读写数据面向缓存,那么缓存中的脏数据何时刷新回主存则会直接影响其他CPU是否能够及时读取到最新数据,如果不能则会产生脏读问题。同时CPU和编译器为了提高效率,很可能会执行指令重排序,从而导致多线程场景下的数据读写并非按照预期的顺序序列执行。
为了解决上面两个问题,硬件层面给出了不同的强弱内存模型保证,强内存模型通常可以保证程序执行的全局顺序性,这里更多针对的是写操作的全局顺序性。而弱内存模式会把是否强制顺序性这个要求交由程序员决定,CPU不会保证这个顺序模型,程序员必须主动要求插入内存屏障指令来强化这个"可见性"。
常见的内存模型分为以下几类:
强内存模型会对程序执行的全局顺序性有着更强的保证,当前这种保证是建立在减少了大量CPU和编译器重排优化,以及通过缓存一致性协议确保高速缓存写操作及时传播到其他CPU的方式下完成的,这些方式会极大降低CPU运行效率。站在处理器角度来看,它们希望内存模型有更少的束缚,从而使得它们可以尽可能多的做些优化来提高性能。
不同的架构的物理机器,可以拥有不一样的内存模型,而我们知道Java的宗旨就是: 一次编译,到处运行。所以为了屏蔽硬件层面不同内存模型的差异,Java提供了语言层面的内存模型,也就是常说的Java内存模型(JMM)。JMM本身不对所有场景提供顺序一致性保证,只对默认提供的8种Happens-Before场景提供保证,也就是说在这8个Happens-Before场景下,JVM会暗地里通过添加内存屏障指令来限制编译器和处理器重排序和强制处理器从主存读写数据,依次实现顺序一致性保证。
由于各种处理器的内存模型强弱不同,为了在不同处理器平台向程序员展示一个一致的内存模型,JMM在不同的处理器中需要插入的内存屏障的数量和种类也不同。
而violate关键字是8个Happens-Before规则下的一个,JMM会确保violate变量的写操作对violate变量的读操作可见。
sumCount累加计数方法中涉及到的共享变量都加上了violate关键字,所以其他线程对violate变量的写操作对当前读线程来说是立即可见的:
private transient volatile long baseCount;
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
ConcurrentHashMap的get过程无需加锁的原因就在于其Node节点的val和next属性都是被violate关键字修饰的,原理同上:
static class Node<K,V> implements Map.Entry<K,V> {
// hash和key是final不可变的,既然不会再有写操作发生,那么可见性和有序性问题也就无所谓了
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...
所以我们能从中学到什么并发设计技巧呢?
- 当需要对共享变量执行读写操作时,可以考虑使用violate关键字修饰该共享变量,这样读取的时候就无需加锁保护了,但是写变量的时候还是需要考虑使用cas配合重试,或者加锁来避免写写冲突。
- 具体是选择cas配合重试,还是加锁,主要看锁变量的时间有多长,短则cas , 长则加锁
ThreadLocalMap
ThreadLocal相信搞Java的同鞋都用过,其本身作为应用程序访问Thread内部ThreadLocalMap的一个转发器,用于实现访问线程本地存储的功能,本节我们重点来看看ThreadLocalMap的实现中有哪些值得我们学习的技巧。
之前写过一篇对ThreadLocal设计思想的剖析,感兴趣可以看看:
- 个人谈谈对ThreadLocal内存泄露的理解
ThreadLocalMap的讲解分为四部分:
- get
- put
- remove
- 扩容
get
在正式讲解get方法实现前,我们需要明确一点,那就是ThreadLocalMap中key类型是ThreadLocal,val类型是Object:
static class Entry extends WeakReference<ThreadLocal<?>> {
// value是Obejct类型
Object value;
// key是ThreadLocal类型 -- 严格来说key是被弱引用包裹的ThreadLocal类型
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// 桶数组
private Entry[] table;
为什么ThreadLocalMap中的key要被弱引用包裹,这一点下面会提到,或者可以阅读我上面给出的文章链接,提前获知答案。ok,下面我们继续来看看get方法的实现:
private Entry getEntry(ThreadLocal<?> key) {
// key的hash值这里就是threadLocal本身的hashCode
// threadLocalMap桶数组大小要求必须是2的n次幂,所以这里也用到了与运算替代取模运算的技巧
int i = key.threadLocalHashCode & (table.length - 1);
// 定位指定桶
Entry e = table[i];
// 判断key是否相等,如果相等直接返回
if (e != null && e.get() == key)
return e;
else
// 否则怎么办呢 ?
return getEntryAfterMiss(key, i, e);
}
ThreadLocalMap对哈希冲突处理并不是采用链式哈希的处理思路,而是走了简单的线性探测法实现,具体如下:
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 采用线性探测法依次往后寻找,遍历到null桶时结束遍历
while (e != null) {
// 如果找到了,直接返回
ThreadLocal<?> k = e.get();
if (k == key)
return e;
// 如果扫描到过期键值对
if (k == null)
expungeStaleEntry(i);
// 继续往后探测
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
// 从当前桶开始往后遍历,直到遍历到null结束遍历
// 遍历到最后一个桶时,会折返到桶的开头开始遍历
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
ThreadLocalMap线性探测法实现的整体思路和理论所述整体差别不大,但是由于其key采用的是弱引用实现,所以当失去了外部应用程序的强引用后,可能会在下一次垃圾回收扫描过程中被回收掉,那么如果线性探测的过程中遇到了被回收的空entry时,此时又该做什么呢?
- 我们知道线性探测法实现简单,但是容易产生大量冲突聚集,因此当执行delete操作时,如果我们不对桶数组中的键值对进行重组,对查询效率会产生一定的影响
- 并且由于ThreadLocalMap本身的特殊场景,其中部分key会因为强引用的丢失而被回收掉,此时这些过期键值对也会对查询产生性能影响
因此ThreadLocalMap中,当遇到了被回收的键值对时,会对当前桶数组进行一定的重组,从发现空桶的下标处开始,往后线性查找,每遇到一个因为产生哈希碰撞而未按照预期待在自己位置上的键值对,就重新计算,把该键值对尽量移动到靠近它原本位置的下标处:
此处空桶指的是过期键值对所在桶
以上重组过程由expungeStaleEntry负责完成,该重组方式也被称为探测式清理:
- 从开始位置向后遍历,清除过期元素,将遍历到的过期数据的 Entry 设置为 null ,同时释放size空间
- 沿途碰到的未过期的数据则将其 rehash 后重新在 table 中定位
- 如果定位到的位置有数据则往后遍历找到第一个 Entry=null 的位置存入
- 接着继续往后检查过期数据,直到遇到null桶才终止探测。
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
// 首先将当前桶关联的键值对清空
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
// 从当前空桶开始向后查找
for (i = nextIndex(staleSlot, len);
// 遇到NULL时结束
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 向后探测过程中再次遇到空桶,则执行清空操作
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 否则计算当前key原本应该放在哪个桶中
int h = k.threadLocalHashCode & (len - 1);
// 如果和当前桶坐标不一致,说明当时放置该键值对时产生了哈希碰撞
// 尝试将当前元素尽量移动到离他原本位置近的位置
if (h != i) {
// 将当前桶清空
tab[i] = null;
// 从当前键值对本该在的位置,往后探测,直到找到一个NULL桶
// 此时查找范围会被限制在[h,i],因为上面把tab[i]已经设置为null了,这一点要注意
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
核心思想: 让每个元素离自己原本应该待的位置更近一些 , 这也是线性探测法需要核心思考的一点!!!
put
上面解析get方法实现时也说到了ThreadLocalMap采用的是线性探测法解决的哈希冲突,put操作的整体流程也是先看目标位置有无人占着,如果有就继续往后找,直到找到一个NULL桶或者空桶:
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 定位桶下标
int i = key.threadLocalHashCode & (len-1);
// 从当前桶开始往后遍历,直到遍历到空桶结束遍历,遍历到最后一个桶时,会折返到桶的开头开始遍历
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 找到key相等的了,则执行更新操作,然后return
if (k == key) {
e.value = value;
return;
}
// 因为key是弱引用,那必然是失去了强引用,被垃圾回收掉了
// 可以将当前空桶占着,但是这里并不是简单的直接占着,下面会讲
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 线性向后探测的过程中没有发现有相等key和空桶
// 但是发现了null桶,所以此处就占着这个null桶吧
tab[i] = new Entry(key, value);
int sz = ++size;
// 执行启发式清理失败的情况下,并且此时已经达到扩容阈值了,则执行扩容
// 如果启发式清理执行成功,说明至少清理了超过一个过期键值对占用的空间
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
当put方法发生哈希碰撞后,在执行线性探测的过程中如果遇到了空桶,最简单的方式就是把这个空桶占着然后直接返回,但是这里还需要考虑顺带做一下重组过程,清理一下过期的键值对:
- 向前搜索的过程中发现了空桶
- 向前搜索过程中没有发现空桶
replaceStaleEntry函数整体过程实现,如上所示,大家可以结合图示和代码注释进行理解:
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
// 从当前空桶位置向前寻找,直到找到第一个空桶为止,或者遇到了null桶
// 如果遍历到第一个桶,则跳到最后一个桶的位置处
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
// 如果遇到了空桶,更新slotToExpunge为向前找到的一个空桶下标
if (e.get() == null)
slotToExpunge = i;
// 从当前空桶位置向后查找,遇到空桶则停下
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 发现存在目标key--但是很显然此时目标key因为哈希碰撞缘故,并不在原本的位置上
if (k == key) {
// 执行更新操作
e.value = value;
// 线性探测一般都是向后找一个空位占着,但是这里先是向前探测找到一个空桶
// 然后占着这个空位
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 如果向前搜索的过程中没有发现空桶,由于staleSlot下标一开始指向的就是空桶
// 经过上面swap操作后,下标i指向的变成了空桶,所以更新启发式清理起始坐标为i
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 开启启发式清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
// 如果向前搜索的过程中没有发现空桶,但是向后探测过程发现了空桶,则记录空桶最开始出现的下标
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
// 说明我们需要执行插入而非更新操作
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
// 如果slotToExpunge不等于原先的staleSlot指针了,那么说明slotToExpunge一个空桶的下标
// 既然存在过期键值对,那么就执行启发式清理,以slotToExpunge为起点
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
cleanSomeSlots函数负责执行启发式清理:
- 向后遍历 Log(2n) 个位置,下标 i 作为遍历的第一个位置。遍历中遇到过期键值对时(假设该位置为 i ),同步调用 expungeStaleEntry(i) 方法
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
// 执行探测式清理,同时设置removed标识为true,表明至少清空了一个过期键值对占用的空间
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
启发式清理在遇到null桶时,就结束本次清理工作。而启发式清理则会进行多轮探测式清理过程,所以清理更彻底一些。
这里清理不仅仅是清理过期键值对,释放size空间,更重要的一点时重新rehash部分元素,让其能够离自己目标位置更近一些。
扩容
ThreadLocalMap的扩容方法rehash并不是直接就进行扩容,由于其桶数组桶可能充斥大量过期键值对,这些过期键值对虽然可以视为空桶,但是其会占用桶数组大小的size空间,因此扩容前,需要先彻底清理掉所有过期键值对,然后再判断此时空间是否够,如果不够再扩容:
/**
* Re-pack and/or re-size the table. First scan the entire
* table removing stale entries. If this doesn't sufficiently
* shrink the size of the table, double the table size.
*/
private void rehash() {
// 清理掉所有过期键值对
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
// 如果还是达到了扩容阈值
if (size >= threshold - threshold / 4)
// 再执行扩容
resize();
}
/**
* Expunge all stale entries in the table.
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
// 每当发现一个空桶就执行一次探测式清理
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
探测式清理不仅负责帮助清理过期键值对,释放size空间,同时还会尽量将元素归位到靠近它自己的位置处,使得get的效率可以更高一些。
真正的扩容resize方法实现比较简单,扩容逻辑简单粗暴,就是翻一倍:
/**
* Double the capacity of the table.
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 扩容大小为旧数组的两倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
// 遍历旧数组中每个桶
for (Entry e : oldTab) {
if (e != null) {
ThreadLocal<?> k = e.get();
// 过期建清理
if (k == null) {
e.value = null; // Help the GC
} else {
// 非过期建则计算其在新桶中的下标
int h = k.threadLocalHashCode & (newLen - 1);
// 如果发生了哈希碰撞,还是采用线性探测法
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 设置新的扩容阈值
setThreshold(newLen);
size = count;
table = newTab;
}
扩容阈值为当前新数组容量的2/3:
/**
* Set the resize threshold to maintain at worst a 2/3 load factor.
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
delete
delete删除元素的代码实现还是比较简单的,还是采用线性探测法看看对应的key是否存在,如果存在就清理掉,同时执行一次探测式清理,清理部分过期建,让部分元素离自己原本位置更近一些:
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
// 定位所在桶,如果存在哈希碰撞,则线性探测往后查询
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 找到目标key
if (e.get() == key) {
e.clear();
// 执行一次探测式清理
expungeStaleEntry(i);
return;
}
}
}
为什么遍历到null桶就可以判断key不存在?
对于ThreadLocalMap而言,其get,put,delete等操作都是遍历到null桶时就判断所要找的key是不存在的,那么为什么可以直接这样判定呢?
- 因为每次执行完get,put,delete等操作后,都会执行1到n轮探测式清理,探测式清理过程中会将扫描到的过期元素设置为null,同时将那些不在自己位置上的元素,rehash到最接近原始位置处。
所以经过探测式清理过后,不可能会出现如下情况:
ThreadLocalMap为什么不需要锁?
首先,在ThreadLocalMap的使用场景下不存在线性安全问题,因为ThreadLocalMap只存在单线程访问,因为ThreadLocalMap是作为每个线程自己的线程本地存储空间,当然只会由当前线程自己进行读写。
ThreadLocal只是一个转发器!!!
为什么ThreadLocalMap要采用线性探测法?
通常情况下,我们不会向线程本地存储中存放太多内容,那么哈希碰撞的概率就比较小,所以考虑到线性探测法对于缓存的友好性,并且实现简单,所以我猜测这是其中的一部分原因。
小结
由于Java 篇哈希数据结构所占篇幅较大,所以决定将Go和Redis的哈希数据结构讨论移到下一篇文章进行讲解。
本文大部分为笔者个人观点,如有不正确,欢迎评论区留言或者私信与我讨论。