目录
为什么使用快速列表quicklist
对比双向链表
对比压缩列表ziplist
quicklist结构
节点结构quicklistNode
quicklist
管理ziplist信息的结构quicklistEntry
迭代器结构quicklistIter
quicklist的API
1.创建快速列表
2.创建快速列表节点
3.头插quicklistPushHead 和尾插quicklistPushTail
4.特定位置插入元素(不是节点)
5.删除元素
6.查找元素
总结 quicklist的特性
为什么使用快速列表quicklist
在 Redis 的早期设计中,如果列表类型的对象中元素的长度较小或数量比较少的,就采用压缩列表来存储,反之则使用双向链表。
对比双向链表
双向链表便于在链表的两端进行插入和删除操作,在插入节点上复杂度很低,但是它的内存开销比较大,每个节点除了要保存数据之外,还要额外保存两个指针,并且双向链表的各个节点是单独的内存块,地址不连续,容易产生内存碎片。
对比压缩列表ziplist
压缩列表存储在一段连续的内存上,所以存储效率高。但是,它每次变更的时间复杂度都比较高,插入和删除操作需要频繁的申请和释放内存,如果压缩列表长度很长,一次 realloc
可能会导致大批量的数据拷贝。
如何保留ziplist的空间高效性,又能不让其更新复杂度过高?
Redis 在 3.2 版本之后引入了快速列表,列表类型的对象其底层都是由快速列表实现。快速列表是双向链表和压缩列表的混合体,它将双向链表按段切分,每一段都使用压缩列表来紧凑存储,多个压缩列表之间使用双向指针关联起来。
quicklist结构
quicklist是个双端链表,节点结构是quicklistNode,节点的zl字段指向压缩列表。
节点结构quicklistNode
quicklistNode是快速列表的节点。
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl; //指向ziplist
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
- prev:前驱节点指针。
- next:后驱节点指针。
- zl:数据指针,如果当前节点的数据没有压缩,则指向一个 ziplist 结构,否则指向一个 quicklistLZF 结构。
- sz:表示zl指向的数据总大小。注意,若数据被压缩,其表示压缩前的数据长度大小。
- count:占16bit,表示当前节点的ziplist的entry的个数
- encoding:占2bit,表示当前节点的数据是否被压缩。1表示没有压缩;2是压缩,用的是LZF算法。
- container:是一个预留字段,本来设计是用来表明一个quicklist节点下面是直接存数据,还是使用ziplist存数据,或者用其它的结构来存数据(用作数据容器,所以叫container)。目前这个值是一个固定的值2,表示使用 ziplist 作为数据容器。
recompress
:当我们使用类似lindex
这样的命令查看了某一项本来压缩的数据时,需要把数据暂时解压,这时就设置recompress=1
做一个标记,等有机会再把数据重新压缩。extra
:其它扩展字段。目前Redis的实现里也没用上。
quicklist
快速列表的结构,从其结构可以看出其是一个链表,保存了头尾节点。
#if UINTPTR_MAX == 0xffffffff
/* 32-bit */
# define QL_FILL_BITS 14
# define QL_COMP_BITS 14
# define QL_BM_BITS 4
#elif UINTPTR_MAX == 0xffffffffffffffff
/* 64-bit */
# define QL_FILL_BITS 16
# define QL_COMP_BITS 16
# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user
since they cause performance degradation. */
#else
# error unknown arch bits count
#endif
//上面的表示:QL_FILL_BITS值在32位机器上是14,64位机器上是16
typedef struct quicklist {
quicklistNode *head; //头节点
quicklistNode *tail; //尾结点
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
//当指定使用lzf压缩算法压缩ziplist的entry节点时,quicklistNode结构的zl成员指向quicklistLZF结构
typedef struct quicklistLZF {
//表示被LZF算法压缩后的ziplist的大小
unsigned int sz; /* LZF size in bytes*/
//保存压缩后的ziplist的数组,柔性数组
char compressed[];
} quicklistLZF;
/* quicklist node encodings */
#define QUICKLIST_NODE_ENCODING_RAW 1
#define QUICKLIST_NODE_ENCODING_LZF 2
/* quicklist compression disable */
#define QUICKLIST_NOCOMPRESS 0
/* quicklist container formats */
#define QUICKLIST_NODE_CONTAINER_NONE 1
#define QUICKLIST_NODE_CONTAINER_ZIPLIST 2
#define quicklistNodeIsCompressed(node) \
((node)->encoding == QUICKLIST_NODE_ENCODING_LZF)
- count:所有压缩列表的节点数量之和
- len:快速类别的节点数量
- fill:存放
list-max-ziplist-size
参数的值,用于设置每个quicklistnode的压缩列表的所有entry的总长度大小。其值默认是-2,表示每个quicklistNode节点的ziplist所占字节数不能超过8kb。若是任意正数:,表示ziplist结构所最多包含的entry个数,最大为215215。 - compress:存放
list-compress-depth
参数的值,用于设置压缩深度。快速列表默认的压缩深度为 0,即不压缩。为了支持快速的 push/pop 操作,快速列表的首尾两个节点不压缩,此时压缩深度就是1。若压缩深度为2,表示快速列表的首尾第一个及第二个节点都不压缩。 bookmark_count
:占 4 bit,bookmarks
数组的长度。bookmarks
:这是一个可选字段,快速列表重新分配内存时使用,不使用时不占用空间。
管理ziplist信息的结构quicklistEntry
和压缩列表一样,entry结构在储存时是一连串的内存块,需要将其每个entry节点的信息读取到管理该信息的结构体中,以便操作。
//管理quicklist中quicklistNode节点中ziplist信息的结构
typedef struct quicklistEntry {
const quicklist *quicklist; //指向所属的quicklist的指针
quicklistNode *node; //指向所属的quicklistNode节点的指针
unsigned char *zi; //指向当前ziplist结构的指针
unsigned char *value; //查找到的元素如果是字符串,则存在value字段
long long longval; //查找到的元素如果是整数,则存在longval字段
unsigned int sz; //保存当前元素的长度
int offset; //保存查找到的元素距离压缩列表头部/尾部隔了多少个节点
} quicklistEntry;
迭代器结构quicklistIter
在Redis的quicklist结构中,实现了自己的迭代器,用于遍历节点。
//quicklist的迭代器结构
typedef struct quicklistIter {
const quicklist *quicklist; //指向所属的quicklist的指针
quicklistNode *current; //指向当前迭代的quicklist节点的指针
unsigned char *zi; //指向当前quicklist节点中迭代的ziplist
long offset; //当前ziplist结构中的偏移量
int direction; //迭代方向
} quicklistIter;
quicklist的API
1.创建快速列表
创建快速列表,快速列表的成员变量都使用默认值
/* Create a new quicklist.
* Free with quicklistRelease(). */
//创建快速列表,并对各个字段进行初始化
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
创建列表,传入自己设置的参数
//设置压缩深度
#define COMPRESS_MAX ((1 << QL_COMP_BITS)-1)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
if (compress > COMPRESS_MAX) {
compress = COMPRESS_MAX;
} else if (compress < 0) {
compress = 0;
}
quicklist->compress = compress;
}
//设置压缩列表的大小(成员fill),即是每个压缩列表的总entry的总长度大小
#define FILL_MAX ((1 << (QL_FILL_BITS-1))-1)
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
} else if (fill < -5) {
fill = -5;
}
quicklist->fill = fill;
}
//设置快速列表的参数
void quicklistSetOptions(quicklist *quicklist, int fill, int depth) {
quicklistSetFill(quicklist, fill);
quicklistSetCompressDepth(quicklist, depth);
}
//通过一些默认参数创建快速列表,就是调用了前面这些封装好的函数
quicklist *quicklistNew(int fill, int compress) {
quicklist *quicklist = quicklistCreate();
quicklistSetOptions(quicklist, fill, compress);
return quicklist;
}
2.创建快速列表节点
REDIS_STATIC quicklistNode *quicklistCreateNode(void) {
quicklistNode *node;
node = zmalloc(sizeof(*node));
node->zl = NULL;
node->count = 0;
node->sz = 0;
node->next = node->prev = NULL;
node->encoding = QUICKLIST_NODE_ENCODING_RAW; //默认不压缩
node->container = QUICKLIST_NODE_CONTAINER_ZIPLIST; //默认使用压缩列表结构来存数据
node->recompress = 0;
return node;
}
3.头插quicklistPushHead
和尾插quicklistPushTail
/* 头部插入
* 如果在已存在节点插入,返回0
* 如果是在新的头结点插入,返回1 */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
//判断头节点的空间是否足够容纳要添加的元素
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 在头结点对应的ziplist中插入
quicklistNodeUpdateSz(quicklist->head);
} else { // 否则新建一个头结点,然后插入数据
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
//新增元素添加进这个新的快速列表节点里
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
/* 尾部插入。
* 如果在已存在节点插入,返回0
* 如果是在新的头结点插入,返回1 */
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_tail = quicklist->tail;
if (likely(
_quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
quicklist->tail->zl =
ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
quicklistNodeUpdateSz(quicklist->tail);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
}
quicklist->count++;
quicklist->tail->count++;
return (orig_tail != quicklist->tail);
}
头插和尾插都是先调用了_quicklistNodeAllowInsert来判断能否在当前头/尾节点插入。如果能插入就直接插入到对应的ziplist中,否则就需要新建一个新节点再进行操作。
前面讲解过的quicklist结构的fill字段,其实_quicklistNodeAllowInsert就是根据fill的值来判断是否已经超过最大容量的。
其中使用到函数_quicklistInsertNodeBefore 和 _quicklistInsertNodeBefore,这两个就是在指定位置插入元素。
4.特定位置插入元素(不是节点)
注意:我们使用Redis,接触到的快速列表插入的都是插入元素,不是插入快速列表的节点。
插入元素会使用到结构体quicklistEntry。
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz) {
_quicklistInsert(quicklist, entry, value, sz, 0);
}
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz) {
_quicklistInsert(quicklist, entry, value, sz, 1);
}
/* 在一个已经存在的entry前面或者后面插入一个新的entry
* 如果after==1表示插入到后面,否则是插入到前面 */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
//1. 获取插入位置的quicklist的节点,通过entry的node字段
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
//2. 该节点不存在,也只有当快速列表的头或尾节点存在才会进入这个if条件
if (!node) {
/* we have no reference node, so let's create only node in the list */
D("No node given!");
new_node = quicklistCreateNode();
//将添加的元素插入快速列表节点对应的压缩列表的节点头部
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
return;
}
//3. 判断node节点对应的压缩列表是否能够容纳得下要添加的元素,即node节点是否已满
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
//表示当前节点的数量已满
full = 1;
}
//4. 判断是否需要是在尾部添加
if (after && (entry->offset == node->count)) {
//表示在尾部添加
at_tail = 1;
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
//表示下一quicklistNode的ziplist的entry已满了
full_next = 1;
}
}
//5.判断是否在头部添加
if (!after && (entry->offset == 0)) {
D("At Head");
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
D("Prev node is full too.");
//表示前一节点已满
full_prev = 1;
}
}
//未完待续..........,后面再讲解
}
- 1.通过通过entry的node字段获取插入位置的quicklist的节点
- 2.判断该节点是否存在,若不存在,就创建节点,并创建ziplist,插入该元素到ziplist
- 3.判断node节点对应的压缩列表是否能够容纳得下要添加的元素,即node节点是否已满,用来设置变量full
- 4.判断是否在该quicklistNode的ziplist的尾部添加,并判断该quicklistNode的下一节点的ziplist是否已满
- 5.判断是否在该quicklistNode的ziplist的头部添加,并判断该quicklistNode的前驱节点的ziplist是否已满
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
//..........................................
/* Now determine where and how to insert the new element */
if (!full && after) {
//6. 当前节点的zipList不满,并且是在当前位置的后面插入
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node); //当前节点解压缩
//entry->zi是ziplist的一个entry,返回entry->zi的下一个ziplist的entry
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
//添加完元素后再根据node->recompress判断是否对压缩列表进行压缩
quicklistRecompressOnly(quicklist, node);
} else if (!full && !after) {
//7. 当前节点的ziplist不满,在当前entry的前面插入
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
//8.
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
//9.
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/* If we are: full, and our prev/next is full, then:
* - create new node and attach to quicklist */
//10.
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
D("\tsplitting node...");
//11
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
这部分主要是分了几种情况来插入:
- 6.当前节点的ziplist没满,并在当前entry的后面插入
- 7.当前节点的ziplist没满,并在当前entry的前面插入
- 8.当前节点的ziplist已满、要添加在尾部、并且后移节点是存在的、后一节点的ziplist没满,那就添加到后一节点对应的ziplist的第一个entry的前面
- 9.当前节点的ziplist已满、要添加在头部、并且前一节点存在、前一节点的ziplist没满,就添加到前一节点的ziplist的尾部。
- 10.当前节点的ziplist已满、插入的位置是在头/尾的、并且当前节点的前/后节点的ziplist已满,则需要创建新的quicklistNode来存放要放的元素。
- 11.当前节点的ziplist已满,但是插入的位置不在两端的,则要从插入位置把当前节点分裂成两个节点
5.删除元素
快速列表删除元素有两个函数 quicklistDelEntry
和 quicklistDelRange
,分别是删除单个元素和删除某个区间的元素。
删除元素使用到了迭代器结构quicklistIter,需要更新迭代器对应的节点等信息。
/* Delete one element represented by 'entry'*/
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
//删除元素,返回值 deleted_node 表示当前节点是否要删除。1表示该节点已删除
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
/* after delete, the zi is now invalid for any future usage. */
iter->zi = NULL;
/* If current node is deleted, we must update iterator node and offset. */
if (deleted_node) {
if (iter->direction == AL_START_HEAD) {
iter->current = next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
iter->current = prev;
iter->offset = -1;
}
}
}
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
//删除 node 节点对应的压缩列表 p 位置的entry,返回新的zipList
node->zl = ziplistDelete(node->zl, p);
node->count--;
if (node->count == 0) {
gone = 1;
__quicklistDelNode(quicklist, node);//当前节点的ziplist的entry个数为0,就删除该节点
} else {
quicklistNodeUpdateSz(node); //更新该节点的ziplist的总长度大小
}
quicklist->count--;
/* If we deleted the node, the original node is no longer valid */
return gone ? 1 : 0;
}
6.查找元素
Redis中没有提供直接查找元素的API。查找元素是通过遍历查找的,这就需要通过上面所讲的迭代器quicklistIter。
quicklistIter *iter = quicklistGetIterator(ql, AL_START_HEAD);
quicklistEntry entry;
int i = 0;
while (quicklistNext(iter, &entry)) { //获取迭代器中的下一个元素
//比较当前的压缩列表节点存储的元素与所要查找的是否相同
if (quicklistCompare(entry.zi, (unsigned char *)"bar", 3)) {
//进行一些操作...........
}
i++;
}
总结 quicklist的特性
- 其是一个节点为ziplist的双端链表
- 节点采用了ziplist,解决了传统链表的内存占用和易产生内存碎片问题
- 对比单个ziplist,quicklist使用了多个ziplist,那每个ziplist的entry个数就可以控制的比较小,解决了连续空间申请效率和ziplist变更的时间复杂度过于大的问题
- 中间节点可以压缩,进一步节省了内存