摘要
介绍线程安全集合类 ConcurrentHashMap 源码,包括扩容,协助扩容,红黑树节点读写线程同步,插入元素后累加键值对数量操作原子性实现。
1 成员变量及其对应的数据结构
底层由数组+红黑树+链表实现
volatile long baseCount 和 volatile CounterCell[] counterCells; 记录map中键值对数量,更新逻辑和 LongAdder一致,volatile int cellsBusy 标记 counterCells 数组结构是否正在变化(塞入新ConterCell 或者数组扩容)
sizeCtl 不同取值对应的场景:
1 在未真正初始化之前保存的是数组长度,初始化后保存的是扩容阈值
2 ==0 代表未初始化
3 -1 代表数组正在初始化 在 initTable 方法中会通过CAS更新值为-1
4 小于-1 代表数组正在扩容转移,有 sizeCtl-1 个线程正在处理转移操作
Node 槽节点父类,其hashcode 取值范围如下
MOVED 代表节点正在转移,当前线程可以参与协助转移
TREEBIN 槽节点为红黑树,红黑树节点类为 TreeNode ,TreeBin对其进行了一层封装,扩展了红黑树双向链表以及 读写同步方法
RESERVED 当调用 compute 方法 (compute|computerifAbsent) 会暂时跟新对应槽为 ReservationNode 代表正在调用 mappingFunction 根据key计算value后,会替换 ReservationNode 对象为mapping Node对象
作用-> 接口暴露的方法源码
构造方法
可以传递 loadFactor initialCapacity 只是用于计算数组长度,不能修改map的负载因子
增
不允许key 和value 为null
1、检查key/value是否为空,如果为空,则抛异常,否则进行2
2、进入for死循环,进行3
3、检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4
4、根据key的hash值计算出其应该在table中储存的位置i(计算hashcode的时候会将高16bit和低16bit进行或运算,将高位的差异带到低位减少hash冲突,散列更均匀),取出table[i]的节点用f表示。 根据f的不同有如下三种情况:
1)如果table[i]==null(即该位置的节点为空,没有发生碰撞),则利用CAS操作直接存储在该位置,如果CAS操作成功则退出,否则继续下一次循环。
2)如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况
2.1)检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
2.2)说明table[i]的节点的hash值不等于MOVED,给当前槽上锁synchronized ,重新获取该槽的对象 ,doublecheck 是否和上锁前获取的同一对象,如果是
如果table[i]为链表节点,对应的hashcode 大于等于0,则将此节点插入链表中即可(插入过程中如果存在key相等节点,直接替换原有值)
如果table[i]为树节点 ,根据槽节点类型判断 TreeBin ,则将此节点插入树中即可。插入成功后,进行 5 为避免多线程竞争,会将当槽加synchronized 锁
5、如果table[i]的节点是链表节点,根据 binCount 判断,红黑树 binCount == 2 只是作为一个标记,链表binCount值为插入元素后槽节点中键值对的数量,则检查table的第i个位置的链表是否需要转化为数(链表插入元素后该槽包含的mapping数量如果超过8,当前槽转换为红黑树 treeifyBin),如果需要则调用treeifyBin函数进行转化
6、上一步骤如果成功,调用 addCount 更新红黑树mapping统计数据
java.util.concurrent.ConcurrentHashMap#addCount 塞入数据后更新map键值对数量
// 增加元素计数,并且如果哈希表太小且尚未处于扩容状态,就会启动扩容转移操作。
// 如果已经在进行扩容转移操作,那么在有工作可做的情况下,会协助执行扩容转移任务。
// 在一次扩容转移操作完成后,会重新检查元素数量,以查看是否由于扩容操作滞后于元素添加操作而已经需要进行另一次扩容。
1 使用LongAdder的方式更新键值对数量,成功后调用 java.util.concurrent.ConcurrentHashMap#sumCount 获取mapping数量的时候
2 判断决定是否扩容,当前map键值对数量是否超过阈值,如果是进行扩容
3 根据数组长度计算 resizeStamp 因为数组长度是*2增长,所以每个stamp 不会重复,第16bit 一定是1
4 如果 sizeCtl 小于0 代表正在扩容,进行协助扩容
4.1 判断当前扩容状态是否需要扩容
((sc >>> RESIZE_STAMP_SHIFT) != rs // sc 小于0,代表正在扩容,分为高16(固定值 resizeStamp) 和底16(执行任务转移的线程数量+1)两个部分 正常情况下,参与转移任务的线程会+1(首次执行transfer的线程会+2 剩余一个用于标识状态)转移任务
// 执行完成后-1 ,正常情况下 不会影响高16数据,如果当前线程在执行扩容时候是延后的线程,比如时间a 进行扩容完成后,会分别更新sizectrl 和 table 可能存在这样的情况,当前线程获取的 sizeCtl 和 table 不是同一个数组的数据
// 或者 另外线程进行扩容完成后,再次进入while循环,此时任然 s >= (long) (sc = sizeCtl) ,进行第二次扩容,但是慢的线程还在第一次,导致 高16bit不相等
|| sc == rs + 1 // 最先开始调用transfer进行扩容的线程会+2 ,当所有任务分配给线程并完成后,每个线程在退出transfer方法时会 -1 ,这样最后一个线程退出后,当剩余1 所以这里使用+1判断
|| sc == rs + MAX_RESIZERS // 超过了最大的helper线程数 实际是 MAX_RESIZERS -1 因为第一个执行扩容操作的线程 sizeCtl 是+2
|| (nt = nextTable) == null // transfer 执行完成后会将扩容后的数组赋值为 table,并将原来的 nextTable 赋值为null 或者 nextTable 还未初始化(第一个扩容线程还未初始化完成)
|| transferIndex <= 0) // 整个数据被转移完后 transferIndex会被变成小于等于0并保留直到下一次转移
4.2 CAS更新 sizeCtl 低16bit +1 代表当前线程协助扩容,增加协助扩容线程数量
4.3 调用 java.util.concurrent.ConcurrentHashMap#transfer 协助扩容
5 如果 sizeCtl 大于等于0 ,CAS更新 sizeCtl 为 (rs << RESIZE_STAMP_SHIFT) + 2 第一次调用transfer 会+2 每个线程退出transfer会-1 最终 sizeCtl 底16bit部分值为1 ,用于作状态标记数组转移任务已经完成
5.1 调用 java.util.concurrent.ConcurrentHashMap#transfer 扩容
java.util.concurrent.ConcurrentHashMap#transfer (协助)扩容数组
1 计算每个线程每次获取任务的步幅 stride 和当前CPU数量有关,最小为16
2 如果 nextTable 为null 创建新数组,长度为原来的2倍,并赋值nextTable
3 for循环
3.1 CAS更新 TRANSFERINDEX 获取当前迁移任务,该字段为全局volatile 变量,线程可见,多线程通过CAS更新
3.2 遍历当前任务范围的节点,如果为null CAS塞入 ForwardingNode 类型节点,该节点指向 nextTab
3.3 如果节点hashcode == MOVED 代表当前处理的旧数组的槽已完成迁移,重新获取任务或者当前任务范围游标递减
3.4 如果是当前任务范围内节点,synchronized 当前节点 进行扩容
3.4.1 doublecheck 当前节点和synchronized 之前获取当前节点的引用是否为同一个节点
3.4.1 如果当前槽节点的hashcode大于等于0 代表类型为链表
由于扩容后数组长度为原来的2倍,扩容前槽链表节点映射到新数组只有两种情况,在原来相同位置或者原来位置+n 的位置 ,在处理链表时候,比较的是节点hashcode&n 如果0 原来位置,1 原来位置+n
为了提高迁移速度,遍历链表 找到最后一段连续相同哈希特征的起始节点 ,然后从头遍历链表将相同的高位节点移动到对应的槽,直到遇到相同后缀起始节点。
3.4.1 如果当前槽节点类型为 TreeBin 代表节点类型为红黑树
红黑树TreeBin维护了红黑树结构同时还维护了一个双向链表,遍历链表,比较高位hash值,分别放在高低槽节点位置,然后判断数量,如果扩容后节点数量小于等于6 将红黑树退化为链表,否则根据双向链表创建红黑树
3.5 当原有的数组所有节点都被迁移完后, TRANSFERINDEX 一定是 <= 0 ,当前线程退出迁移操作
3.5.1 CAS更新 sizeCtl低16bit -1 扣减当前线程数量
如果扣减前 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 代表最后一个退出线程,会从原数组n-1开始将数组检查一遍节点是否有迁移完成 后退出
java.util.concurrent.ConcurrentHashMap#treeifyBin 链表插入mapping后,槽内mapping数量超过8个,当前槽转换为红黑树
1 如果 table数组长度小于 64 ,调用 tryPresize 扩容
2 否则判断指定槽不为null并且节点hashcode>=0(代表当前节点为数组),如果是 进入3
3 synchronized 当前槽,重新获取当前槽 doublecheck 是否和上锁之前一个对象,如果是4
4 将链表转换为红黑树 TreeBin
java.util.concurrent.ConcurrentHashMap#initTable 初始化数组
使用CAS方式更新table引用
1 判断table是否为null或者table.length == 0 如果是进入2
2 如果 sizeCtl <0 表示正在初始化(实际为-1) yield放弃CPU,重新进入1。否则进入3
3 CAS更新 sizeCtl 为-1 成功后,doublecheck table == null|| table.length == 0 如果是进入4
4 初始化之前 sizeCtl 保存的是数组实际的长度,使用该值new数组,然后赋值给table,重新计算 sizeCtl 值为实际长度的0.75
5 初始化成功,否则继续循环CAS初始化
计算哈希的时候,将key的hashcode的高16bit和低16bit 进行或操作,这样可以将高16bit的差异信息带到低位
初始化操作: 乐观锁方式 ,死循环+CAS更新 SIZECTL 为-1 代表正在初始化,成功更新后,进行doublecheck,数组table是否为空,长度是否等于0,这样避免重复初始化。创建数组默认长度16,赋值给table,然后回滚 SIZECTL
帮助扩容逻辑: java.util.concurrent.ConcurrentHashMap#helpTransfer
1 计算当前线程进行扩容时候的 resizeStamp ,该值用于保证进行扩容的多个线程是同一批次的,取当前table数组长度前缀0个数, 因为数组长度是2的幂,所以 前缀0个数 是唯一的,所以可以保证同一批次。然后和 (1 << (RESIZE_STAMP_BITS - 1)) 进行与操作,保证第16bit为1 。然后进入步骤2
2 while 判断nexttab tab 是否是原来的对象 已经 sizeCtl 是否小于0 (-1 代表正在初始化,小于-1 代表正在扩容线程数量+1) 如果满足条件 进行3
3 判断是否有比较进行协助扩容,判断逻辑如下:
(sc >>> RESIZE_STAMP_SHIFT) != rs // sc 小于0,代表正在扩容,分为高16(固定值 resizeStamp) 和底16(执行任务转移的线程数量+1)两个部分 正常情况下,参与转移任务的线程会+1(首次执行transfer的线程会+2 剩余一个用于标识状态)转移任务
// 执行完成后-1 ,正常情况下 不会影响高16数据,如果当前线程在执行扩容时候是延后的线程,比如时间a 进行扩容完成后,会分别更新sizectrl 和 table 可能存在这样的情况,当前线程获取的 sizeCtl 和 table 不是同一个数组的数据
// 或者 另外线程进行扩容完成后,再次进入while循环,此时任然 s >= (long) (sc = sizeCtl) ,进行第二次扩容,但是慢的线程还在第一次,导致 高16bit不相等
|| sc == rs + 1 // 最先开始调用transfer进行扩容的线程会+2 ,当所有任务分配给线程并完成后,每个线程在退出transfer方法时会 -1 ,这样最后一个线程退出后,当剩余1 所以这里使用+1判断
|| sc == rs + MAX_RESIZERS // 超过了最大的helper线程数 实际是 MAX_RESIZERS -1 因为第一个执行扩容操作的线程 sizeCtl 是+2
|| (nt = nextTable) == null // transfer 执行完成后会将扩容后的数组赋值为 table,并将原来的 nextTable 赋值为null 或者 nextTable 还未初始化(第一个扩容线程还未初始化完成)
|| transferIndex <= 0 // 整个数据被转移完后 transferIndex会被变成小于等于0并保留直到下一次转移
如果满足上面条件,代表当前轮次的扩容已经完成,中断while循环结束协助扩容逻辑,否则执行步骤4
4 CAS更新 SIZECTL 为 SIZECTL+1 ,代表进行扩容的线程数量+1,调用 transfer方法协助扩容,在扩容完成后,扣减扩容数量。完成扩容后 中断while循环结束协助扩容逻辑,否则跳转2 (可能是因为CAS未成功,需要重新尝试)
扩容逻辑: java.util.concurrent.ConcurrentHashMap#transfer
触发时间点: 插入元素,mapping数量大于 sizeCtl (正常情况下,该值为扩容阈值,未初始时候,保存数组长度,初始化时候为-1 避免线程同时扩容,小于-1 代表正在迁移)
第一次扩容开始扩容的时候,sizeCtl >0 , 扩容线程会将 sizeCtl 更新为 (rs << RESIZE_STAMP_SHIFT) + 2 因为rs第16bit为1 位移后,整个 sizeCtl 为负数,低16bit为进行扩容线程数量,第一次赋值为2有两个方面原因,作为开始扩容的标识,同时包含一个扩容线程数量。
每个线程执行完 transfer 后会sizectrl -1 ,并且判断 (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT ,为fale直接结束transfer,否则最后进行一次数组遍历检查,只有最后一个线程退出transfer的时候会满足。所以第一次调用transfer的时候是+2 作为一个标识。另外最后一个线程检查完后
退出transfer扩容逻辑, sizeCtl 是 +1 的状态,后续线程如果要进行扩容,会判断 sizeCtl 是否等于+1 如果是代表数组扩容完成。如果是+1 会导致最后扩容完成 sizestap位移16位的结果,不能判断是未开始扩容还是已经扩容完成。总结: 扩容操作中 +2 代表当前轮次开始扩容的第一个线程,+1 代表扩容完成状态
如果是后续扩容,sizeCtl 为负数,会检当前扩容操作是否完成,检查逻辑如上面步骤。
扩容步骤如下:
1 根据原有数组长度 CPU个数计算线程每次任务步长 stride ,最少为 16
2 初始化扩容后的数组 nextTab ,扩容后长度为原来数组长度的2倍 ,初始化 nextTab 之后直接赋值,不用担心多线程扩容重复初始化 nextTab 数组,因为第一个执行扩容操作的线程 首先通过CAS将 sizeCtl 修改为负数,后续扩容或者协助扩容的线程都会判断 nextTab 是否为空,如果不是才扩容,所以不会出现竞争
3 for循环 遍历当前领取任务范围
4 计算当前线程进行扩容的区间范围,多线程通过 CAS更新 TRANSFERINDEX 方式进行同步,该值为volatile全局变量,初始化为原数组长度,每次扣减 stride 直到小于等于0 代表原有数组扩容转移任务领取完成
因为是CAS方式更新,所以在外层增加了while循环
5 当前i位置节点如果为 null 直接赋值为 ForwardingNode 对象,该对象hashcode为负数,在查询时候,会判断槽节点hashcode,如果为负数会根据节点类型判断,可能为TreeBin或者 ForwardingNode 。ForwardingNode 对象维护了nexttab ,查询时候会到nexttab新数组上获取元素
当前i位置节点hashcode如果为 Moved (对应 ForwardingNode 对象),进入下一次循环,更新i继续进行判断
当前i位置节点如果不为上面两种情况,代表是正常数据(链表或者红黑树) , 该槽上synchronized 锁,同时doublecheck 下,重新根据hashcode获取槽节点和当前获取的槽节点判断是否为同一个(==方式判断),然后判断如果槽节点hash错的大于等于0 代表链表,如果类型为TreeBin 代表红黑树
链表扩容操作: 首先从头到尾遍历链表,每个节点的hashcode 和 数组长度n 位与操作,找到链表最后连续的相同的bit位的 lastRun ,比较n代表的是扩容后计算hash使用mask的最高位,移动链表的时候直接移动,
然后从头开始遍历链表直到遇到 lastRun 过程中根据节点的hash值和n位与操作,判断是移动还是保留在原位置,最终得到两个链表,分别放在原来的槽位置和+n的槽位置
红黑树维护了包含所有树节点的双向链表,遍历双向链表,根据节点的hash值和n与操作,等于0代表放在原来槽位置,等于1代表放在+n槽位置,得到高低槽位置的两个链表后塞入nexttab对应位置,如果拆分后节点数量小于 6 将红黑树转换为单链表(实际就是根据双向链表生成单链表)
红黑树每个节点为一个槽,在对节点进行数据修改都是通过 synchronized 保证多线程数据安全性。
6 当最后一个线程获取完任务后, TRANSFERINDEX 被更新为 <=0 ,代表转移任务分配完成,当前线程修改 sizeCtl -1 扣减进入transfer时候增加的转移线程数量,退出时候会判断 (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 如果成立(只有最后一个退出线程成立) 会重新遍历老数组检查是否有遗漏,最后一个线程退出前更新 sizeCtl 为新数组扩容阈值, nextTable = null; table = nextTab;
插入槽节点的红黑树步骤: java.util.concurrent.ConcurrentHashMap.TreeBin#putTreeVal
红黑树本质是平衡树,从根节点开始查找,根据遍历节点和待插入节点hashcode大小判断待插入左右子树,如果hashcode相同,并且key也相同,返回查找到的节点。存在这样的情况,由于hash冲突,hashcode相同,但是key不相同,使用compareto进行比较,如果结果为0 ,分别在左右子树上查找,查找到节点返回。否则判断 遍历节点和待插入节点是否同时不为null并且类名称相同 比较两个节点的 System.identityHashCode 。然后插以红色节点插入叶子节点,有必要时调整树结构
将链表转换为红黑树步骤: java.util.concurrent.ConcurrentHashMap#treeifyBin
1 如果数组长度小于 64 尝试扩容,而不是转换为红黑树
2 给槽节点上 synchronized 锁,doublecheck 是否被修改,遍历链表,每个节点创建 TreeNode节点,并维护节点的pre 和 next,以双链表创建红黑树。红黑树维护了所有节点的双链表,节点数据转移时候方便遍历。
删|改 java.util.concurrent.ConcurrentHashMap#replaceNode
查询逻辑和插入数据逻辑是一致的
1 判断key是否存在,根据key的hashcode计算槽位置,如果为null,直接返回。
2 如果槽节点hashcode == MOVED 代表数组正在扩容,当前线程协助扩容。扩容后会更新数组,所以需要重新查找槽节点,所以整个替换逻辑放在for循环里面,只要table != null 就循环
3 除了以上两种情况外,给槽节点上 synchronized 锁,doublecheck 槽节点是否为原来节点,然后根据槽节点数据类型替换数据
3.1 如果槽节点hashcode >=0 代表链表,遍历 查找hashcode相同,key相同,如果查找到,更新上一节点next为当前节点的next,将当前节点从链表中剔除
3.2 如果槽节点类型为TreeBin 代表红黑树,在红黑树中查找节点同时更换value
4 该方法传递参数如果为null,代表将查找到节点删除,如果是这种情况,更新 baseCount
ConcurrentHashMap 维护mapping数量的数据结构:
使用LongAdder的方式统计map中的 mapping
维护一个公共计数器 BASECOUNT ,线程竞争不激烈时CAS方式累加 BASECOUNT ,如果失败,根据当前线程的 threadLocalRandomProbe 计算在 counterCells 数组上的槽节点,然后在该槽节点上尝试CAS,如果失败更新线程的 threadLocalRandomProbe 再散列
counterCells 是线程共享结构,通过 volatile 变量 cellsBusy 进行线程同步,更改数组结构操作必须通过CAS方式更新 cellsBusy 为1 。该数组长度最多为系统CPU数量,竞争相当激烈会扩容 counterCells 数组,长度2倍直到超过CPU数量
java.util.concurrent.ConcurrentHashMap#compute
和 replaceNode 相同逻辑,不同的是,如果槽节点为null,会通过CAS将该槽节点塞入 ReservationNode 对象。
查
使用LongAdder的方式统计map中的 key-value 数量,维护了 baseCount 和 counterCells 数组,竞争不激烈,CAS更新 baseCount 如果竞争激烈会
根据当前线程的 threadLocalRandomProbe 在 counterCells 上进行散列,然后进行CAS累加操作,这样避免多个线程竞争同一个变量。获取mapping数量的时候
遍历 counterCells 数组累加到 baseCount上
根据key查询mapping步骤: java.util.concurrent.ConcurrentHashMap#get
对于查找逻辑,如果是链表,顺序遍历链表直到找到节点数据。如果是其他类型节点调用父类Node的find方法,所有子类都override了find方法。
1 根据参数key计算hashcode,得到对应的槽节点
2 判断槽节点是否和key相等(hashcode 相等并且(地址相等||equals相等)) 如果是返回key对应的value
否则 判断节点的hashcode如果小于0 代表当前节点为红黑树 或者 ForwardingNode 调用对应类的find方法
2.1 如果是红黑树
检查 lockState 是否为 Writer|Waiter状态 如果是,遍历TreeBin维护的双向链表查询数据
否则CAS 更新 lockState 在Reader比特位累加 Reader,如果有其他写线程,会CAS lockState 累加 Waiter比特位,完成查询操作后,lockState CAS 扣减 Reader比特位,最后一个读线程完成操作后,会释放阻塞等待的写线程
2.2 如果是 ForwardingNode ,到nextTable 上查找,查找逻辑还是hashcode散列判断对应槽hashcode,
如果小于0,并且类型为 ForwardingNode 更新遍历table为新的nextTable,重新进入for循环查找,如果不是 ForwardingNode 类型,调用对应类型的find方法
如果上面情况都不是,更新遍历节点为next继续循环查找。外部为for死循环,直到查找对应元素或者next为null或者找到匹配的key返回
2.3 如果是 ReservationNode(直接返回null)
/**
* putVal方法可以分为以下几步:
* 1、检查key/value是否为空,如果为空,则抛异常,否则进行2
* 2、进入for死循环,进行3
* 3、检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4
* 4、根据key的hash值计算出其应该在table中储存的位置i,取出table[i]的节点用f表示。
* 根据f的不同有如下三种情况:
* 1)如果table[i]==null(即该位置的节点为空,没有发生碰撞),则利用CAS操作直接存储在该位置,如果CAS操作成功则退出死循环。
* 2)如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况
* 2.1)检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
* 2.2)说明table[i]的节点的hash值不等于MOVED,如果table[i]为链表节点,则将此节点插入链表中即可
* 如果table[i]为树节点,则将此节点插入树中即可。插入成功后,进行 5
* 5、如果table[i]的节点是链表节点,则检查table的第i个位置的链表是否需要转化为数,如果需要则调用treeifyBin函数进行转化
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();// key和value不允许null
int hash = spread(key.hashCode());//两次hash,减少hash冲突,可以均匀分布
int binCount = 0;//i处结点标志,0: 未加入新结点, 2: TreeBin或链表结点数, 其它:链表结点数。主要用于每次加入结点后查看是否要由链表转为红黑树
for (Node<K, V>[] tab = table; ; ) {//CAS经典写法,不成功无限重试
Node<K, V> f;
int n, i, fh;
//检查是否初始化了,如果没有,则初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
/**
* i=(n-1)&hash 等价于i=hash%n(前提是n为2的幂次方).即取出table中位置的节点用f表示。 有如下两种情况:
* 1、如果table[i]==null(即该位置的节点为空,没有发生碰撞),则利用CAS操作直接存储在该位置, 如果CAS操作成功则退出死循环。
* 2、如果table[i]!=null(即该位置已经有其它节点,发生碰撞)
*/
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break; // no lock when adding to empty bin
} else if ((fh = f.hash) == MOVED)//检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
tab = helpTransfer(tab, f);
else {//table[i]的节点的hash值不等于MOVED。
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
// 这里判断 == f 难道 synchronized 之后 i 位置的对象会被更新为其他对象,比如扩容 transfer
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;// 代表链表长度
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可
Node<K, V> pred = e;
if ((e = e.next) == null) {//插入到链表末尾并跳出循环
pred.next = new Node<K, V>(hash, key,
value, null);
break;
}
}
} else if (f instanceof TreeBin) {// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作
Node<K, V> p;
// 红黑树场景 默认为2 这个变量对于链表是长度,对于红黑树是标记,满足后面的不等于0判断,然后返回 oldVal
binCount = 2;
// putTreeVal 中调用 lockRoot(); 实现红黑树节点的读写同步
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);//若length<64,直接tryPresize,两倍table.length;不转红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)
addCount(1L, binCount);
return null;
}
// CAS + 死循环,初始化数组 table 直到成功,返回 table ,并且 sizeCtl = 12
// 死循环过程中判断 sizeCtl ,如果小于0 代表有其他线程在初始化table 当前线程yield
// 完成后 sizeCtl 为当前数组长度的 1/4 table被初始化
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重校验
// 初始化线程进入这里,并且初始化完成,执行完finally代码快 sizeCtl 被更新为0
// 这个时候其他线程 执行上面的else if 判断 ,成功,再次进入该代码块
// 所以为了避免重复初始化table 所以需要双重校验
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
private final void lockRoot() {
// 只有写线程会走这个方法,读线程直接获取TreeBin的 lockStatus 状态然后CAS直接更新
// 写线程调用当前方法之前已经在槽节点添加 synchronized 了,所以不存在写和写竞争,只能是读写竞争
// 场景 读写 读写读
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) // 竞争失败只能是已经有读线程更新了 lockstate 状态
// 可能CAS时候 lockstate 不为0 或者 单纯的CAS失败
contendedLock(); // offload to separate method
}
// 场景1 hash冲突,在同一个槽塞入元素,该槽类型为红黑树,并且插入后的节点违反了红黑树的性质需要调整红黑树的结构
// 第一个线程获取了锁,将lockstate 更新为 writer,后续写线程同样进入了该for循环,会更新lockstate|waiter 并park
// 场景2 读线程更新成功,写线程更新了waiter为当前线程,这个时候读线程释放了锁,写线程再次for循环判断lockstate状态
// 这个时候第一个if命中,并且waiting为true,waiter为当前线程,这个时候CAS更新lockstate增加writer bit位,同时清理waiter
private final void contendedLock() {
boolean waiting = false;
for (int s; ; ) {
// 场景: 进入该方法时候读写线程竞争,当前写线程竞争失败,到这里读线程已经释放锁,这个时候判断为true
if (((s = lockState) & ~WAITER) == 0) { // 当前位置的TreeBin 没有reader 和writer,当前lockState 可能为0 或者 Waiter
// 这个时候又出现读线程竞争并成功,当前写线程竞争失败
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
if (waiting) // 之前成功更新了WAITER,因为有读线程占用,进入第二次循环的时候,读线程释放了锁,
// 并且上面的CAS 更新WRITER成功,代表成功获取了锁,这里上一轮次更新的waiter赋值为null
waiter = null;
return;
}
} else if ((s & WAITER) == 0) { // 当前位置的TreeBin 没有Waiter 可能是 reader 或者 writer
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
waiting = true;
waiter = Thread.currentThread();
}
// CAS更新之后不直接park ,直接进入下一次循环,可能在下一次循环时候,读线程释放了读锁,当前写线程再次尝试CAS获取锁成功了就不阻塞 提高了并发性
} else if (waiting)
LockSupport.park(this);
}
}
// 增加元素计数,并且如果哈希表太小且尚未处于扩容状态,就会启动扩容转移操作。
// 如果已经在进行扩容转移操作,那么在有工作可做的情况下,会协助执行扩容转移任务。
// 在一次扩容转移操作完成后,会重新检查元素数量,以查看是否由于扩容操作滞后于元素添加操作而已经需要进行另一次扩容。
private final void addCount(long x, int check) {
CounterCell[] as;
long b, s;
// 如果 countercells为空才竞争 basecount ,因为countercells数组都存在的代表竞争很激烈
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a;
long v;
int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K, V>[] tab, nt;
int n, sc;
while (s >= (long) (sc = sizeCtl) && (tab = table) != null && // 超过了阈值进行扩容
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;// 根据数组长度计算迭代表示位,因为数组长度是*2增长,所以每个stamp 不会重复,第16bit 一定是1
if (sc < 0) {
if (/*(sc >>> RESIZE_STAMP_SHIFT) != rs // sc 小于0,代表正在扩容,分为高16(固定值 resizeStamp) 和底16(执行任务转移的线程数量+1)两个部分 正常情况下,参与转移任务的线程会+1(首次执行transfer的线程会+2 剩余一个用于标识状态)转移任务
// 执行完成后-1 ,正常情况下 不会影响高16数据,如果当前线程在执行扩容时候是延后的线程,比如时间a 进行扩容完成后,会分别更新sizectrl 和 table 可能存在这样的情况,当前线程获取的 sizeCtl 和 table 不是同一个数组的数据
// 或者 另外线程进行扩容完成后,再次进入while循环,此时任然 s >= (long) (sc = sizeCtl) ,进行第二次扩容,但是慢的线程还在第一次,导致 高16bit不相等
|| */
sc == rs + 1 // 最先开始调用transfer进行扩容的线程会+2 ,当所有任务分配给线程并完成后,每个线程在退出transfer方法时会 -1 ,这样最后一个线程退出后,当剩余1 所以这里使用+1判断
|| sc == rs + MAX_RESIZERS // 超过了最大的helper线程数 实际是 MAX_RESIZERS -1 因为第一个执行扩容操作的线程 sizeCtl 是+2
|| (nt = nextTable) == null // transfer 执行完成后会将扩容后的数组赋值为 table,并将原来的 nextTable 赋值为null 或者 nextTable 还未初始化(第一个扩容线程还未初始化完成)
|| transferIndex <= 0) // 整个数据被转移完后 transferIndex会被变成小于等于0并保留直到下一次转移
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) // 第一次调用transfer 会+2 每个线程退出transfer会-1 最终 sizeCtl 底16bit部分值为1 ,用于作状态标记数组转移任务已经完成
// rs 被分为了两个部分,高16bit 和低16bit
transfer(tab, null);
s = sumCount();
}
}
}
// 对于lockState属性取值说明: 因为涉及红黑树结构更新的操作使用了 synchronized 进行控制,所以任何时候只有一个线程能修改红黑树结构 ,该变量用于红黑树结构更新时候,同时另外线程查询操作场景下进行线程同步。因为更新加synchronized,所以不存在写写线程同步的状态(或者说该字段 lockState 不处理写写线程同步),只是读写线程同步。该字段涉及如下场景
//场景1 : 读写
// CAS更新 lockState 从0 更新为 Reader ,后续写操作线程期望将 lockState 从0 CAS更新为Writer失败,这个时候会在 lockState 添加 Waiter比特位,然后调用 LockSupport阻塞自己
// 读线程完成数据操作,调用CAS 更新 lockState 扣减Reader 比特位(相当于Reader线程数量--,因为Reader在高位,可以重复累加标识为读线程数量), 如果时候读线程之前 lockState == Waiter|Reader 代表有线程在等待,调用 LockSupport 唤醒阻塞线程
//
//场景1 : 读写读
// 第二次读时候,因为 lockState 状态为 Reader|Waiter 所以在 find 方法中遍历红黑树维护的双向链表,需要注意的是,遍历过程中都会尝试CAS更新 lockState 状态增加Reader比特位 一但成功,还是走红黑树查找
//场景2 : 读读写读
// 第二次读会累加 lockState 的 Reader比特位,写线程依然阻塞,第三个读走链表遍历。两个读线程释放 lockState 会扣减Reader的比特位,只有当最后一个读线程释放的时候会唤醒等待的写线程
//场景3 : 写读
// 写线程成功CAS更新 lockState 添加 Writer比特位(更新前 lockState 值为0 ),读线程会走链表方式解锁
扩容算法
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;// 为啥直接赋值,而不是CAS方式判断一下其他线程是否已经赋值 因为第一次进入该方法的时候会CAS更新 sizeCtl 只有成功的才能进入,后续的进入该方法需要判断 sizeCtl 和 nextTable 是否为null 所以不需要CAS更新
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
boolean advance = true; // 代表当前处理的旧数组的槽已完成迁移
boolean finishing = false; // to ensure sweep before committing nextTab 老数组所有数据被迁移完,将table指向新数组,更新sizeCtrl
// 当前线程通过 transferIndex 计算当前轮次负载转移的范围 [transferIndex-stride,transferIndex-1]
// transfer 是volatile 全局变量通过CAS更新,并且递减所以多线程移动范围不会重叠或者遗漏
for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) // 更新i
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;// 整个for循环结束 完成了old 数组的所有节点遍历,i=-1 是用于跳出下面的if判断
advance = false; // 跳出当前while的判断
} else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 最后一个stride对应的任务被线程获取后, transferindex <= 0 直到 transfer任务执行完成也不会变
// 存在这样的场景,整个数组实际已经转移完了,后续有线程进入,会在上面的 else if 判断被捕获,从而直接返回
bound = nextBound; // i递减的下区间
i = nextIndex - 1;// [boudn i] 闭区间
advance = false;
}
}
// i<0是上面的while的第二个判断结果 只有这个条件满足后给i赋值,才能进入该条件判断,其他地方没有给i赋值的逻辑
// i >= n || i+n >= nextn 是进入当前if后更新的结果
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 线程完成 recheck 检查之后会走到这里
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 进入该方法之前,会更新 SIZECTL 为 (rs << RESIZE_STAMP_SHIFT) + 2 ,判断条件为 SIZECTL > 0 也就是第一次调用transfer
// 这里判断不等于-2 代表已经有其他线程完了的 recheck 所以这里直接返回 这样保证了只有一个执行转移任务线程完成转移后进行recheck
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 场景: 调用 transfer 的时候会判断sizectrl大于0的时候,将 sizectrl 修改为+2 ,相当于第一次 能进入这个地方,代表老数组所有节点已经被迁移完成
// 比如有多个线程在进行迁移,只要有线程将任务领取完(transferindex <=0 ,因为每次领取任务都会递减移动该volatile) 如果其他线程完成了迁移任务,会走到这里
// 这里将 finishing 赋值为 true 为的是从n-1开始将数组检查一遍,上面的while里面因为有finishing 会一直递减
// advance 赋值为 true 是为了进入上面的while 循环 ,让i--
finishing = advance = true;
i = n; // recheck before commit
}
} else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K, V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K, V> lastRun = f;
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K, V>(ph, pk, pv, ln);
else
hn = new Node<K, V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
} else if (f instanceof TreeBin) {
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果小于等于6 将原来的红黑树退回为链表 ,否则重新构建红黑树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K, V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
面试经典问题
问题: 红黑树使用了哪些方式保证了线程同步
1 synchronized 插入数据时,给当前槽上锁
2 CAS+死循环+doublecheck ,初始化数组,会CAS更新sizectrl 为-1 成功更新后会 doublecheck table == null || table.length == 0
3 如果槽节点为红黑树,如何保证节点读写一致性(插入数据同时有其他读线程操作)
ConcurrentHashMap 维护了内部类 Node,该类是红黑树以及链表的父类,并且维护了next属性,所以无论槽节点第一个节点类型,都可以沿着next遍历,并且该Node还是 红黑树(TreeBin) ReservationNode 和 ForwardingNode 三个类override父为了的find方法,在查询时候执行自己的查找逻辑
TreeBin 中的 lockState 讲解
读场景关联代码 java.util.concurrent.ConcurrentHashMap.TreeBin#find
写场景关联代码 lockRoot() 调用的地方
这个状态是用于同步读写线程的,由于写会给TreeBin 添加 synchronized 锁,所以 lockState 不存在写写线程同步的状态(或者说该字段不处理写写线程同步),只是读写线程同步
场景1 : 读写
CAS更新 lockState 从0 更新为 Reader ,后续写操作线程期望将 lockState 从0 CAS更新为Writer失败,这个时候会在 lockState 添加 Waiter比特位,然后调用 LockSupport阻塞自己
读线程完成数据操作,调用CAS 更新 lockState 扣减Reader 比特位(相当于Reader线程数量--,因为Reader在高位,可以重复累加标识为读线程数量), 如果时候读线程之前 lockState == Waiter|Reader 代表有线程在等待,调用 LockSupport 唤醒阻塞线程
场景1 : 读写读
第二次读时候,因为 lockState 状态为 Reader|Waiter 所以在 find 方法中遍历红黑树维护的双向链表,需要注意的是,遍历过程中都会尝试CAS更新 lockState 状态增加Reader比特位 一但成功,还是走红黑树查找
场景2 : 读读写读
第二次读会累加 lockState 的 Reader比特位,写线程依然阻塞,第三个读走链表遍历。两个读线程释放 lockState 会扣减Reader的比特位,只有当最后一个读线程释放的时候会唤醒等待的写线程
场景3 : 写读
写线程成功CAS更新 lockState 添加 Writer比特位(更新前 lockState 值为0 ),读线程会走链表方式解锁
就是一个读线程和一个写线程的竞争,期间读线程任何时候都可能结束,并且重新获取读锁。读写竞争的代码
读: java.util.concurrent.ConcurrentHashMap.TreeBin#find
写: java.util.concurrent.ConcurrentHashMap.TreeBin#lockRoot
问题:hashmap put 方法流程
1 如果map为空或者长度为0 ,调用resize() 初始化map
2 根据key对应的hash映射在Node数组上的位置,如果为null,将当前key-value封装为Node对象放入该位置
3 如果不为null,该位置第一个节点的类型,如果是 TreeNode ,调用 putTreeVal() 将key-value插入红黑树
如果第一个节点类型为链表,从头遍历。判断插入的key和链表节点是否相等(hashcode判断,然后地址,然后equals()) ,如果相等,替换原有值。否则将key-value封装为Node插入链表末尾
插入节点后,如果当前链表长度大于 TREEIFY_THRESHOLD(8) ,将当前链表转换为红黑树
4 元素插入完成后,Map的key-value 数量如果大于 threshold ,对map进行扩容(resize())
5 HashMap的put方法额外定义了钩子方法,在插入元素成功后调用(afterNodeInsertion) 或者插入元素已经存在更新值后调用(afterNodeAccess())