写在前面
以下内容是基于Redis 6.2.6 版本整理总结
一、跳表(skiplist)
如何理解跳表?在了解跳表之前,我们先从普通链表开始,一点点揭开跳表的神秘面纱~
首先,普通单链表来说,即使链表是有序的,我们要查找某个元素,也需要从头到尾遍历整个链表。这样效率很低,时间复杂度是O(n)。
那么有没有方法提升查询效率呢?我们可以尝试为链表建立“索引”来提升查询效率。如下图,我们在原始链表的基础上,每两个元素提取一个索引,down指向原始链表的节点:
此时,假如我们要查询值为19的节点,我们从索引层开始遍历,当遍历到16时,下个节点的值为23,所以,19一定在这两个节点之间。我们通过16节点的down指针来到原始链表,将继续遍历,直到找到值为19的节点。在没有建“索引”之前,我们需要遍历8次,才能找到19,而在建立“索引”后,需要6次就能找到,也就是,索引帮我们减少了查询的次数。
那如果我们再建一级索引呢?哈哈哈,没想到吧也是6次,这是因为我们的数据量太少,即便加了两级索引,优化效果也不是很明显。在数据量大时,优化效果还是很明显的,有兴趣可以自己动手画一画。
1.1 跳表的时间复杂度
假设链表有n个节点,每两个节点生成一个索引,则有第一层索引节点的个数为n/2,第二层索引节点的个数是第一层个数的一半n/4,以此类推,第h层节点的个数就是n/(2^h)。假设,第h层有两个节点,则:h = log2n - 1,再算上原始链表,则整个跳表的高度就是log2n。
我们在查询某个数据的时候,每一层需要遍历m个节点,那么在跳表中查询某个数据的时间复杂度就是:O(m*log2n)。那m是多少呢? 按照上面每两个节点上升一个索引节点的索引结构,我们每一层索引最多遍历3个节点,为什么呢?解释如下:
假设我们查找的还是19,在第k层索引中,当我们遍历到11时,发现19在11和23之间,我们通过11的down节点,来到第k-1层。在第k-1层索引中,11 到 23 最多包含3个节点(包含11 和 23 的节点),所以在第k-1索引,我们最多需要遍历3个节点,依次类推,每一层索引都最多只需要遍历3个节点。
通过上面的分析,我们知道了m = 3,也就是说在跳表中查询任意节点的时间复杂度是O(3*log2n),去掉常数项后,时间复杂度就是:O(log2n)。这个查找跟二分查找的时间复杂度一样。换句话说,我们是基于单链表实现了二分查找,神奇吧。但是,这种查询效率的提升是有代价的,也就是我们需要维护多层级索引,才能实现。这也是一种空间换时间的思路。
1.2 空间复杂度
要实现log2n的时间复杂度,跳表就需要额外存储这些索引的空间。那么,需要多大的空间呢?我们来分析一下:
假设原始链表有n个节点,按照每连个节点上升一个索引节点的索引结构,第一层有n/2,第二层n/4,依次类推,第h层有n/2^h个索引节点。假设第h层有2个节点。则总共有:
n/2 + n/4 + n/8 + … + 2 = n-2。所以,跳表的空间复杂度是O(n)。
也就是说,如果将含有n个节点的链表构造成跳表,我们还需要额外再用接近n个节点来存储这些索引,还有没有办法较少索引占的空间呢?答案是有的,上面的分析是基于每两个节点上升一个索引节点,那么换成3个、5个呢?如果为3,也很好分析,需要的索引总数为:n/3 + n/9 + n/27 + … + 3 + 1 = n/2。尽管空间复杂还是O(n),但实际上索引的数量已经减少了一半了。
在实际开发中,原始链表中的对象可能是很大的对象,而索引节点只是存储关键的值和指针,相较于原始节点,大小可以忽略不计。
1.3 跳表的插入和删除
我们想在跳表中插入和删除一个节点,第一步是要找到插入和删除的位置,然后再执行插入或者删除,因为跳表的查询时间复杂度是O(log2n),插入和删除的时间复杂度也是O(log2n)。
1.3.1 插入
1.3.2 删除
删除操作就需要注意一下,如果删除的节点也存在于索引节点中,那么,索引中的节点也要删除。单链表中的删除,需要拿到前驱节点的指针,如果是双向链表就不用考虑了。
1.4 跳表索引的动态更新
当我们一直往跳表中添加元素,如果不更新索引就可能出现,某2个索引之间的索引数过多,极端情况下,会退化为单向链表。
作为一种动态数据结构,我们需要某种手段作为索引节点和原始链表大小的平衡,也就是说,当链表中的节点数增多时,也响应的增加一些索引节点,避免复杂度的退化。红黑树和AVL树是通过左旋和右旋来维持左右子树的平衡。跳表则是通过随机函数来维护这种平衡。
2、跳表在Redis中的应用
有序集合 zet 的底层实现就是跳表。大部分情况下,跳表的效率可以和平衡树媲美,平均时间复杂度O(logn),最坏O(n)。
2.1 跳表源码
每次创建一个新的跳表节点时,会根据幂次定律(越大的数出现的概率越小)随机生成一个介于1到32之间的数作为level数组的大小,这个数组大小就是层的高度。level层数确定源码:
// src/t_zset.c
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
两个宏定义
// src/sever.h
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
server.h
// 996行
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// sds 对象,唯一的
sds ele;
// 分值
double score;
// 后退指针,用于从后往前遍历使用
struct zskiplistNode *backward;
// 层数
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度,用来确定本节点再链表中的排位 zrank
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 指向跳表头节点和尾节点的指针
struct zskiplistNode *header, *tail;
// 跳表中的元素个数,不包含头节点 zcard
unsigned long length;
// 跳表中层数最高的节点的层数
int level;
} zskiplist;
1.3 创建skiplist
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
1.4 跳表的插入和删除
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
// 删除
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
三、总结
- 跳表是有序集合zset的实现之一
- 跳表由zskiplist 和 zskiplistNode两个结构组成,zskiplist保存跳表的信息,如表头和表尾节点、跳表的长度等,zskiplistNode 保存节点详细信息
- 每个跳表节点的层高都是 1~32 之间的随机数
- 跳表中的对象是唯一的
- 跳表中的元素是按照分值从小到大排列,当分值相同时,按照成员对象的大小排序