ConcurrentSkipListMap-跳跃表 源码解析
问题
- 跳跃表长什么样子呢?
- 跳跃表如何查找指定 key 数据呢?
- 跳跃表如何添加指定 key-value 数据呢?
- 跳跃表如何删除指定 key 数据呢?
理论知识
跳表是一个随机化的数据结构,实质就是一种可以进行二分查找的有序链表。
跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。
跳表不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。
- 跳跃表 SkipList 的数据结构
最底层的节点,每一个都是 Node 节点,按照 key 升序排序。而上方具有等级的节点,是索引节点,上图的跳跃表是具有 3 层索引的跳跃表,第一层 level-1 的索引密度最大,第二层 level-2 是对一层索引的索引,第三层 level-3 同理,越往上密度越小,跨度越大。
head 是 ConcurrentSkipListMap 中一个非常重要的字段,它就是通过 head 去关联整个跳跃表,它永远指向 BaseHeader 最上层的索引节点 headIndex,以这个索引节点为入口访问整个跳跃表。
- 跳跃表的原则
- 最底层的数据节点按照关键字升序排序
- 包含多级索引,每个级别的索引节点按照其关联数据节点的关键字 key 升序排列
- 高级别索引是低级别索引的子集
- 如果关键字 key 在级别
level = i
的索引中出现, 则级别level <= i
的索引中都包含 key
源码解析
字段和内部数据结构
- BASE_HEADER 头结点
- head 指向 node(BASE_HEADER) 的顶层索引
- Node 静态内部类(最底层的节点 存储 key-value)
- Index 静态内部类(索引节点)
- HeadIndex 静态内部类(头索引节点)
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializble {
// 最底层的节点的head
private static final Object BASE_HEADER = new Object();
// head 节点
private transient volatile HeadIndex<K,V> head;
// 比较器
final Comparator<? super K> comparator;
// key
private transient KeySet<K> keySet;
// 元组
private transient EntrySet<K,V> entrySet;
// 值
private transient Values<V> values;
private transient ConcurrentNavigableMap<K,V> descendingMap;
// cas 设置head
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
/* ---------------- Nodes -------------- */
// 最底层node节点,单链表结构
static final class Node<K,V> {
final K key;
// 注意:这里value的类型是Object,而不是V
// 在删除元素的时候value会指向当前元素本身
volatile Object value;
volatile Node<K,V> next;
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 协助删除
void helpDelete(Node<K,V> b, Node<K,V> f) {
if (f == next && this == b.next) {
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
b.casNext(this, f.next);
}
}
}
/* ---------------- Indexing -------------- */
// 索引节点,存储着对应的node值,及向下和向右的索引指针
static class Index<K,V> {
// 当前节点
final Node<K,V> node;
// 下面的节点
final Index<K,V> down;
// 右边的节点
volatile Index<K,V> right;
/**
* Creates index node with given values.
*/
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
/**
* compareAndSet right field
*/
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
/**
* Returns true if the node this indexes has been deleted.
* @return true if indexed node is known to be deleted
*/
final boolean indexesDeletedNode() {
return node.value == null;
}
// 链接节点
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
Node<K,V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}
// 移除节点
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
}
/* ---------------- Head nodes -------------- */
/**
* Nodes heading each level keep track of their level.
*/
// 头索引节点,继承自Index,并扩展一个level字段,用于记录索引的层级
static final class HeadIndex<K,V> extends Index<K,V> {
// 头节点级别
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
/**
* 节点比较逻辑
* c:比较器
*/
static final int cpr(Comparator c, Object x, Object y) {
return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}
}
构造方法
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
this.comparator = null;
initialize();
putAll(m);
}
public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
this.comparator = m.comparator();
initialize();
buildFromSorted(m);
}
四个构造方法里面都调用了 initialize() 这个方法,那么,这个方法里面有什么呢?
private static final Object BASE_HEADER = new Object();
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
// Node(K key, Object value, Node<K,V> next)
// HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level)
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
可以看到,它初始化了一些属性,并创建了一个头索引节点,里面存储着一个数据节点,这个数据节点的值是空对象,且它的层级是 1。
所以,初始化的时候,跳表中只有一个头索引节点,层级是 1,数据节点是一个空对象,down 和 right 都是 null。
通过内部类的结构我们知道,一个头索引指针包含 node、down、right 三个指针,为了便于理解,我们把指向 node 的指针用虚线表示,其它两个用实线表示,也就是虚线不是表明方向的。
添加元素 put() => doPut()
- 添加指定 key-value 到跳跃表内
public V put(K key, V value) {
// 不能存储value为null的元素
// 因为value为null标记该元素被删除(后面会看到)
if (value == null)
throw new NullPointerException();
// 调用doPut()方法添加元素
return doPut(key, value, false);
}
||
\/
private V doPut(K key, V value, boolean onlyIfAbsent) {
// 添加元素后存储在z中
Node<K,V> z; // added node
// key也不能为null
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
/*
* Part I:找到目标节点的位置并插入
*/
// 这里的目标节点是数据节点,也就是最底层的那条链
// 自旋 outer循环 处理并发冲突 进行重试..等其它需要重试的情况
outer: for (;;) {
// 寻找目标节点之前最近的一个索引对应的数据节点,存储在b中,b=before
// 并把b的下一个数据节点存储在n中,n=next
// 为了便于描述,我这里把b叫做当前节点,n叫做下一个节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
// 如果下一个节点不为空
// 就拿其key与目标节点的key比较,找到目标节点应该插入的位置
if (n != null) {
// v=value,存储节点value值
// c=compare,存储两个节点比较的大小
Object v; int c;
// n的下一个数据节点,也就是b的下一个节点的下一个节点(孙子节点)
Node<K,V> f = n.next;
// 如果n不为b的下一个节点
// 说明有其它线程修改了数据,则跳出内层循环
// 也就是回到了外层循环自旋的位置,从头来过
if (n != b.next) // inconsistent read
break;
// 如果n的value值为空,说明该节点已删除,协助删除节点
if ((v = n.value) == null) { // n is deleted
// todo 这里为啥会协助删除?后面讲
n.helpDelete(b, f);
break;
}
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
if (b.value == null || v == n) // b is deleted
break;
// 如果目标key与下一个节点的key大
// 说明目标元素所在的位置还在下一个节点的后面
if ((c = cpr(cmp, key, n.key)) > 0) {
// 就把当前节点往后移一位
// 同样的下一个节点也往后移一位
// 再重新检查新n是否为空,它与目标key的关系
b = n;
n = f;
continue;
}
// 如果比较时发现下一个节点的key与目标key相同
// 说明链表中本身就存在目标节点
if (c == 0) {
// 则用新值替换旧值,并返回旧值(onlyIfAbsent=false)
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 如果替换旧值时失败,说明其它线程先一步修改了值,从头来过
break; // restart if lost race to replace value
}
// 如果c<0,就往下走,也就是找到了目标节点的位置
// else c < 0; fall through
}
/*
* 有两种情况会到这里
* 1.到链表尾部了,也就是n为null了
* 2.找到了目标节点的位置,也就是上面的c<0
*/
// 新建目标节点,并赋值给z
// 这里把n作为新节点的next
// 如果到链表尾部了,n为null,这毫无疑问
// 如果c<0,则n的key比目标key大,相当于在b和n之间插入目标节点z
z = new Node<K,V>(key, value, n);
// 原子更新b的下一个节点为目标节点z
if (!b.casNext(n, z))
// 如果更新失败,说明其它线程先一步修改了值,从头来过
break; // restart if lost race to append to b
// 如果更新成功,跳出自旋状态
break outer;
}
}
// 经过Part I,目标节点已经插入到有序链表中了
/*
* Part II:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
*/
// 取个随机数
int rnd = ThreadLocalRandom.nextSecondarySeed();
// 0x80000001展开为二进制为10000000000000000000000000000001
// 只有两头是1
// 这里(rnd & 0x80000001) == 0
// 相当于排除了负数(负数最高位是1),排除了奇数(奇数最低位是1)
// 只有最高位最低位都不为1的数跟0x80000001做&操作才会为0(也就是最高位和最低位都为0)
// 也就是正偶数,概率为1/4
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
// level 表示z(新插)节点的索引级别,默认level为1,也就是只要到这里了就会至少建立一层索引
// max 表示当前跳跃表 索引最大级别
int level = 1, max;
// 随机数从最低位的第二位开始,有几个连续的1则level就加几
// 因为最低位肯定是0,正偶数嘛
// 比如,1100110,从最低位的第二位往前数,总共有两个1连续,level就加2
while (((rnd >>>= 1) & 1) != 0)
++level;
// idx 最终指向z节点未处理前后关系的索引(用于记录目标节点建立的最高的那层索引节点)
Index<K,V> idx = null;
// 取头索引节点(这是最高层的头索引节点,左上角的HeadIndex)
HeadIndex<K,V> h = head;
// case1
// 如果生成的层数小于等于当前最高层的层级
// 也就是跳表的高度不会超过现有高度
if (level <= (max = h.level)) {
// 从第一层开始建立一条竖直的索引链表
// 这条链表使用down指针连接起来
// 每个索引节点里面都存储着目标节点这个数据节点
// 最后idx存储的是这条索引链表的最高层节点
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
// index-3 ← idx
// ↓(down指针)
// index-2
// ↓(down指针)
// index-1
// ↓(down指针)
// Node(z)
}
// case2
else { // try to grow by one level
// 重新计算level的值,设置成更合理的值
// 如果新的层数超过了现有跳表的高度,则最多只增加一层
// 比如现在只有一层索引,那下一次最多增加到两层索引,增加多了也没有意义
level = max + 1; // hold in array and later pick the one to use
// idxs用于存储目标节点建立的竖起索引的所有索引节点
// 其实这里直接使用idx这个最高节点也是可以完成的
// 只是用一个数组存储所有节点要方便一些
// 注意,这里数组0号位是没有使用的
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
// 从第一层开始建立一条竖的索引链表(跟上面一样,只是这里顺便把索引节点放到数组里面了)
// 看完这个for循环 就清楚了..原来 index[0] 的这个数组slot 并没有使用..只使用 [1,level] 这些数组slot了。
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
// index-4 ← idx
// ↓(down指针)
// index-3
// ↓(down指针)
// index-2
// ↓(down指针)
// index-1
// ↓(down指针)
// Node(z)
// 自旋
for (;;) {
// 旧的最高层头索引节点
h = head;
// 旧的最高层级
int oldLevel = h.level;
// 再次检查,如果旧的最高层级已经不比新层级矮了
// 说明有其它线程先一步修改了值,从头来过
// 一般不成立..只有在并发情况下才有可能成立
if (level <= oldLevel) // lost race to add level
break;
// 新的最高层头索引节点(newh最终指向新的headIndex,要给baseHeader提升索引),目前指向旧的最高层头索引节点
HeadIndex<K,V> newh = h;
// 头节点指向的数据节点
Node<K,V> oldbase = h.node;
// 超出的部分建立新的头索引节点
for (int j = oldLevel+1; j <= level; ++j)
// 正常这个语句只会执行1次
// oldbase:BaseHeader
// newh:down指针↓(原来的最高头索引节点)
// idxs[j]:right指针→(新的最高层索引节点)
// j:level,新level
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// 执行完for循环之后,baseHeader 索引长这个样子..
// headIndex-4 → index-4 ← idx
// ↓ ↓
// headIndex-3 index-3
// ↓ ↓
// headIndex-2 index-2
// ↓ ↓
// headIndex-1 index-1
// ↓ ↓
// baseHeader .... Node(z)
// 但是headIndex-3、2、1还没有跟右边的索引节点串起来
// 原子更新头索引节点
// cas成功后,skipListMap.head 字段指向 最新的 headIndex,即上图中 baseHeader 的左上角 index-4 节点。
if (casHead(h, newh)) {
// h指向新的最高层头索引节点
h = newh;
// 把level赋值为旧的最高层级的
// idx指向的不是新的最高的索引节点了
// 而是与旧最高层平齐的索引节点
// 因为要开始往下处理底层的索引节点了(它们还没串联起来)
idx = idxs[level = oldLevel];
break;
}
}
}
// 经过上面的步骤,有两种情况
// 1.没有超出高度,新建一条目标节点的索引节点链
// 2.超出了高度,新建一条目标节点的索引节点链,同时最高层头索引节点同样往上长
/*
* Part III:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起
*/
// 这时level是等于旧的最高层级的,自旋
// insertionLevel 代表 z-node 尚未处理 队列关系的 层级..
splice: for (int insertionLevel = level;;) {
// h为最高头索引节点
int j = h.level;
// 从头索引节点开始遍历
// 为了方便,这里叫q为当前节点,r为右节点,d为下节点,t为目标节点相应层级的索引
for (Index<K,V> q = h, r = q.right, t = idx;;) {
// 如果遍历到了最右边,或者最下边,
// 也就是遍历到头了,则退出外层循环
if (q == null || t == null)
break splice;
// 如果右节点不为空
if (r != null) {
// n是右节点的数据节点,为了方便,这里直接叫右节点的值
Node<K,V> n = r.node;
// 比较目标key与右节点的值
int c = cpr(cmp, key, n.key);
// 如果右节点的值为空了,则表示此节点已删除
if (n.value == null) {
// 则把右节点删除
if (!q.unlink(r))
// 如果删除失败,说明有其它线程先一步修改了,从头来过
break;
// 删除成功后重新取右节点
r = q.right;
continue;
}
// 如果比较c>0,表示目标节点还要往右
if (c > 0) {
// 则把当前节点和右节点分别右移
q = r;
r = r.right;
continue;
}
}
// 到这里说明已经到当前层级的最右边了
// 如果新生成的索引节点高度大于原最高层索引,这里实际是会先走第二个if
// 第一个if
// j与insertionLevel相等了
// (如果此时j=4,insertionLevel=3)实际是先走的第二个if,j自减后应该与insertionLevel相等
if (j == insertionLevel) {
// 这里是真正连右指针的地方
if (!q.link(r, t))
// 连接失败,从头来过
break; // restart
// t节点的值为空,可能是其它线程删除了这个元素
if (t.node.value == null) {
// 这里会去协助删除元素
findNode(key);
break splice;
}
// 当前层级右指针连接完毕,向下移一层继续连接
// 如果移到了最下面一层,则说明都连接完成了,退出外层循环
if (--insertionLevel == 0)
break splice;
}
// 第二个if
// j先自减1,再与两个level比较
// j、insertionLevel 和 t(idx),三者是对应的,都是还未把右指针连好的那个层级
if (--j >= insertionLevel && j < level)
// t往下移
t = t.down;
// 当前层级到最右边了
// 那只能往下一层级去走了
// 当前节点下移
// 再取相应的右节点
q = q.down;
r = q.right;
}
}
}
return null;
}
// Node.class中的方法,协助删除元素
void helpDelete(Node<K,V> b, Node<K,V> f) {
/*
* Rechecking links and then doing only one of the
* help-out stages per call tends to minimize CAS
* interference among helping threads.
*/
// 这里的调用者this==n,三者关系是b->n->f
if (f == next && this == b.next) {
// 将n的值设置为null后,会先把n的下个节点设置为marker节点
// 这个marker节点的值是它自己
// 这里如果不是它自己说明marker失败了,重新marker
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
// marker过了,就把b的下个节点指向marker的下个节点
b.casNext(this, f.next);
}
}
// Index.class中的方法,删除succ节点
final boolean unlink(Index<K,V> succ) {
// 原子更新当前节点指向下一个节点的下一个节点
// 也就是删除下一个节点
return node.value != null && casRight(succ, succ.right);
}
// Index.class中的方法,在当前节点与succ之间插入newSucc节点
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
// 在当前节点与下一个节点中间插入一个节点
Node<K,V> n = node;
// 新节点指向当前节点的下一个节点
newSucc.right = succ;
// 原子更新当前节点的下一个节点指向新节点
return n.value != null && casRight(succ, newSucc);
}
B、N、F、Z 指针,假设插入的节点为 7。
-
情况1:跳跃表内存在 key 一致元素,做替换
-
情况2:插入新元素,无须给新元素生成索引节点
-
情况3:插入新元素,需要给新元素生成索引节点,且索引高度 < maxLevel(代码中的 case1)
-
情况4:插入新元素,需要给新元素生成索引节点,且索引高度 > maxLevel(代码中的 case2)
我们这里把整个插入过程分成三个部分:
Part I:找到目标节点的位置并插入
(1)这里的目标节点是数据节点,也就是最底层的那条链;
(2)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(3)从这个数据节点开始往后遍历,直到找到目标节点应该插入的位置;
(4)如果这个位置有元素,就更新其值(onlyIfAbsent = false);
(5)如果这个位置没有元素,就把目标节点插入;
(6)至此,目标节点已经插入到最底层的数据节点链表中了;
Part II:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
(1)取个随机数 rnd,计算 (rnd & 0x80000001)
;
(2)如果不等于 0,结束插入过程,也就是不需要创建索引,返回;
(3)如果等于 0,才进入创建索引的过程(只要正偶数才会等于 0);
(4)计算 while (((rnd >>>= 1) & 1) != 0)
,决定层级数,level 从 1 开始;
(5)如果算出来的层级不高于现有最高层级,则直接建立一条竖直的索引链表(只有 down 有值),并结束 Part II;
(6)如果算出来的层级高于现有最高层级,则新的层级只能比现有最高层级多 1;
(7)同样建立一条竖直的索引链表(只有 down 有值);
(8)将头索引也向上增加到相应的高度,结束 Part II;
(9)也就是说,如果层级不超过现有高度,只建立一条索引链,否则还要额外增加头索引链的高度(脑补一下,后面举例说明);
Part III:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起(补上 right 指针)
(1)从最高层级的头索引节点开始,向右遍历,找到目标索引节点的位置;
(2)如果当前层有目标索引,则把目标索引插入到这个位置,并把目标索引前一个索引向下移一个层级;
(3)如果当前层没有目标索引,则把目标索引位置前一个索引向下移一个层级;
(4)同样地,再向右遍历,寻找新的层级中目标索引的位置,回到第(2)步;
(5)依次循环找到所有层级目标索引的位置并把它们插入到横向的索引链表中;
总结起来,一共就是三大步:
(1)插入目标节点到数据节点链表中;
(2)建立竖直的 down 链表;
(3)建立横向的 right 链表;
添加元素举例
假设初始链表是这样:
假如,我们现在要插入一个元素 9。
(1)寻找目标节点之前最近的一个索引对应的数据节点,在这里也就是找到了 5 这个数据节点;
(2)从 5 开始向后遍历,找到目标节点的位置,也就是在 8 和 12 之间;
(3)插入 9 这个元素,Part I 结束;
然后,计算其索引层级,假如是 3,也就是 level = 3。
(1)建立竖直的 down 索引链表;
(2)超过了现有高度 2,还要再增加 head 索引链的高度;
(3)至此,Part II 结束;
最后,把 right 指针补齐。
(1)从第 3 层的 head 往右找当前层级目标索引的位置;
(2)找到就把目标索引和它前面索引的 right 指针连上,这里前一个正好是 head;
(3)然后前一个索引向下移,这里就是 head 下移;
(4)再往右找目标索引的位置;
(5)找到了就把 right 指针连上,这里前一个是 3 的索引;
(6)然后 3 的索引下移;
(7)再往右找目标索引的位置;
(8)找到了就把 right 指针连上,这里前一个是 5 的索引;
(9)然后 5 下移,到底了,Part III 结束,整个插入过程结束;
findPredecessor()
// 寻找目标节点之前最近的一个索引对应的数据节点
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
// key不能为空
if (key == null)
throw new NullPointerException(); // don't postpone errors
// 自旋
for (;;) {
// 从最高层头索引节点开始查找,先向右,再向下
// 直到找到目标位置之前的那个索引
for (Index<K,V> q = head, r = q.right, d;;) {
// 如果右节点不为空
if (r != null) {
// 右节点对应的数据节点,为了方便,我们叫右节点的值
Node<K,V> n = r.node;
K k = n.key;
// 如果右节点的value为空
// 说明其它线程把这个节点标记为删除了
// 则协助删除
if (n.value == null) {
if (!q.unlink(r))
// 如果删除失败
// 说明其它线程先删除了,从头来过
break; // restart
// 删除之后重新读取右节点
r = q.right; // reread r
continue;
}
// 如果目标key比右节点还大,继续向右寻找
if (cpr(cmp, key, k) > 0) {
// 往右移
q = r;
// 重新取右节点
r = r.right;
continue;
}
// 如果c<0,说明不能再往右了
}
// 到这里说明当前层级已经到最右了
// 两种情况:一是r==null,二是c<0
// 再从下一级开始找
// 如果没有下一级了,就返回这个索引对应的数据节点
if ((d = q.down) == null)
return q.node;
// 往下移
q = d;
// 重新取右节点
r = d.right;
}
}
}
删除元素 remove() => doRemove()
- 删除指定 key 对应的元素
public V remove(Object key) {
return doRemove(key, null);
}
||
\/
final V doRemove(Object key, Object value) {
// key不为空
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 自旋
outer: for (;;) {
// 寻找目标节点之前的最近的索引节点对应的数据节点
// 为了方便,这里叫b为当前节点,n为下一个节点,f为下下个节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 整个链表都遍历完了也没找到目标节点,退出外层循环
if (n == null)
break outer;
// 下下个节点
Node<K,V> f = n.next;
// 再次检查
// 如果n不是b的下一个节点了
// 说明有其它线程先一步修改了,从头来过
if (n != b.next) // inconsistent read
break;
// 如果下个节点的值为null了
// 说明有其它线程标记该元素为删除状态了
if ((v = n.value) == null) { // n is deleted
// 协助删除
n.helpDelete(b, f);
break;
}
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
if (b.value == null || v == n) // b is deleted
break;
// 如果c<0,说明没找到元素,退出外层循环
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
// 如果c>0,说明还没找到,继续向右找
if (c > 0) {
// 当前节点往后移
b = n;
// 下一个节点往后移
n = f;
continue;
}
// c=0,说明n就是要找的元素
// 如果value不为空且不等于找到元素的value,不需要删除,退出外层循环(一般传进来的value都为null 这里不会成立)
if (value != null && !value.equals(v))
break outer;
// 如果value为空,或者相等
// 原子标记n的value值为空
if (!n.casValue(v, null))
// 如果删除失败,说明其它线程先一步修改了,从头来过
break;
// P.S.到了这里n的值肯定是设置成null了
// 关键!!!!
// 让n的下一个节点指向一个market节点
// 这个market节点的key为null,value为marker自己,next为n的下个节点f
// 或者让b的下一个节点指向下下个节点
// 注意:这里是或者||,因为两个CAS不能保证都成功,只能一个一个去尝试
// 这里有两层意思:
// 1.如果标记market成功,再尝试将b的下个节点指向下下个节点,如果第二步失败了,进入条件,如果成功了就不用进入条件了
// 2.如果标记market失败了,直接进入条件
if (!n.appendMarker(f) || !b.casNext(n, f))
// 通过findNode()重试删除(里面有个helpDelete()方法)
findNode(key); // retry via findNode
else {
// 上面两步操作都成功了,才会进入这里,不太好理解,上面两个条件都有非"!"操作
// 说明节点已经删除了,通过findPredecessor()方法删除索引节点
// findPredecessor()里面有unlink()操作,将删除节点的索引与其它索引断开
//(核心就是判断当前node节点的value是否为null,为null则将这个索引出队)
findPredecessor(key, cmp); // clean index
// 如果最高层头索引节点没有右节点,则跳表的高度降级
if (head.right == null)
tryReduceLevel();
}
// 返回删除的元素值
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
- 设置指定元素 value 为 null
- 将指定 node 从 node 链表移除
- 将指定 node 的 index 节点 从 对应的 index 链表移除
(1)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(2)从这个数据节点开始往后遍历,直到找到目标节点的位置;
(3)如果这个位置没有元素,直接返回 null,表示没有要删除的元素;
(4)如果这个位置有元素,先通过n.casValue(v, null)
原子更新把其 value 设置为 null;
(5)通过n.appendMarker(f)
在当前元素后面添加一个 marker 元素标记当前元素是要删除的元素;
(6)通过b.casNext(n, f)
尝试删除元素;
(7)如果上面两步中的任意一步失败了都通过findNode(key)
中的n.helpDelete(b, f)
再去不断尝试删除;
(8)如果上面两步都成功了,再通过findPredecessor(key, cmp)
中的q.unlink(r)
删除索引节点;
(9)如果 head 的 right 指针指向了 null,则跳表高度降级;
删除元素举例
假如初始跳表如下图所示,我们要删除 9 这个元素。
(1)找到 9 这个数据节点;
(2)把 9 这个节点的 value 值设置为 null;
(3)在 9 后面添加一个 marker 节点,标记 9 已经删除了;
(4)让 8 指向 12;
(5)把索引节点与它前一个索引的 right 断开联系;
(6)跳表高度降级;
至于,为什么要有(2)(3)(4)这么多步骤呢,因为多线程下如果直接让 8 指向 12,可以其它线程先一步在 9 和 12 间插入了一个元素 10 呢,这时候就不对了。
所以这里搞了三步来保证多线程下操作的正确性。
如果第(2)步失败了,则直接重试;
如果第(3)或(4)步失败了,因为第(2)步是成功的,则通过 helpDelete() 不断重试去删除;
其实 helpDelete() 里面也是不断地重试(3)和(4);
只有这三步都正确完成了,才能说明这个元素彻底被删除了。
这一块结合上面图中的红绿蓝色好好理解一下,一定要想在并发环境中会怎么样。
查找元素 get() => doGet()
- 查询指定 key 对应的 value
public V get(Object key) {
return doGet(key);
}
||
\/
private V doGet(Object key) {
// key不为空
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 自旋
outer: for (;;) {
// 寻找目标节点之前最近的索引对应的数据节点
// 为了方便,这里叫b为当前节点,n为下个节点,f为下下个节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 如果链表到头还没找到元素,则跳出外层循环
if (n == null)
break outer;
// 下下个节点
Node<K,V> f = n.next;
// 如果不一致读,从头来过
if (n != b.next) // inconsistent read
break;
// 如果n的值为空,说明节点已被其它线程标记为删除
if ((v = n.value) == null) { // n is deleted
// 协助删除,再重试
n.helpDelete(b, f);
break;
}
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
if (b.value == null || v == n) // b is deleted
break;
// 如果c==0,说明找到了元素,就返回元素值
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 如果c<0,说明没找到元素
if (c < 0)
break outer;
// 如果c>0,说明还没找到,继续寻找
// 当前节点往后移
b = n;
// 下一个节点往后移
n = f;
}
}
return null;
}
(1)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(2)从这个数据节点开始往后遍历,直到找到目标节点的位置;
(3)如果这个位置没有元素,直接返回 null,表示没有找到元素;
(4)如果这个位置有元素,返回元素的 value 值;
查找元素举例
假如有如下图所示这个跳表,我们要查找 9 这个元素,它走过的路径是怎样的呢?可能跟你相像的不一样…
(1)寻找目标节点之前最近的一个索引对应的数据节点,这里就是 5;
(2)从 5 开始往后遍历,经过 8,到 9;
(3)找到了返回;
整个路径如下图所示:
是不是很操蛋?
为啥不从 9 的索引直接过来呢?
从我实际打断点调试来看确实是按照上图的路径来走的。
我猜测可能是因为 findPredecessor() 这个方法是插入、删除、查找元素多个方法共用的,在单链表中插入和删除元素是需要记录前一个元素的,而查找并不需要,这里为了兼容三者使得编码相对简单一点,所以就使用了同样的逻辑,而没有单独对查找元素进行优化。
总结
- ConcurrentSkipListMap 是
空间换时间
的一种典型应用。 - ConcurrentSkipListMap 就是 java 中跳跃表的实现,只不过加了一些并发处理。
彩蛋
为什么 Redis 选择使用跳表而不是红黑树来实现有序集合?
首先,我们来分析下 Redis 的有序集合支持的操作:
- 插入元素
- 删除元素
- 查找元素
- 有序输出所有元素
- 查找区间内所有元素
其中,前 4 项红黑树都可以完成,且时间复杂度与跳表一致。
但是,最后一项,红黑树的效率就没有跳表高了。
在跳表中,要查找区间的元素,我们只要定位到两个区间端点在最低层级的位置,然后按顺序遍历元素就可以了,非常高效。
而红黑树只能定位到端点后,再从首位置开始每次都要查找后继节点,相对来说是比较耗时的。
此外,跳表实现起来很容易且易读,红黑树实现起来相对困难,所以 Redis 选择使用跳表来实现有序集合。
参考
- 视频参考
- b站_小刘讲源码【硬核教程】Java 跳跃表源码分析(skipList),彻底搞定跳跃表有关全部面试问题!
- 文章参考
- 彤哥读源码_死磕 java集合之ConcurrentSkipListMap源码分析——发现个bug