一前言
上一篇已经说明了B+树的一些原理,也讲到,我们目前采用的持久化数据的方式,而且我们是单独的插入数据,没有任何元数据信息,虽然插入的速度很快,因为是采用追加的方式。但是这种方式插入速度很快,像上次所说,查询和删除的速度会很慢。
我们以前用的就是非排序数组行,保存的是数据,没有其他信息。插入性能最好,但是删除和查找时间复杂度为O(n),排序数组查找很快,可以采用二分法查找,时间复杂度为O(log(n)),但是插入和删除时间复杂度为O(n),而采用B+树方式,且保存了元数据和主键的情况下,无论是查找、插入还是删除,性能都达到了均衡。
二 改造
2.1 元数据信息
采用树的方式来保存数据库的数据时候,就不能是简单的只记录原始信息,还需要记录诸如子节点指针信息, 为了方便遍历到兄弟节点,还需要保存这指向父节点的指针信息(这里面的指针类似c语言的指针,在磁盘保存时候,要看具体的实现,可能是个页号)。同样我们为了区分子节点和叶子节点,需要保存节点的类别,以及是否为root节点这些信息。
用这种排序的树保存数据,还有个好处,就是遍历比较方便。
节点类型
typedef enum { NODE_INTERNAL, NODE_LEAF } NodeType;
root节点元数据
/*
* Common Node Header Layout
*/
// 节点类型数据的大小,其实只有一个bit就可以区分叶子节点和根节点,这里面浪费了点
const uint32_t NODE_TYPE_SIZE = sizeof(uint8_t);
// 节点类型的偏移,放在页节点的开头
const uint32_t NODE_TYPE_OFFSET = 0;
// 是否为root的元数据大小
const uint32_t IS_ROOT_SIZE = sizeof(uint8_t);
// 是否为root的元数据的偏移量
const uint32_t IS_ROOT_OFFSET = NODE_TYPE_SIZE;
// 指向父指针的指针大小
const uint32_t PARENT_POINTER_SIZE = sizeof(uint32_t);
// 指向父指针的偏移量
const uint32_t PARENT_POINTER_OFFSET = IS_ROOT_OFFSET + IS_ROOT_SIZE;
// 整个Node节点的元数据整体尺寸
const uint8_t COMMON_NODE_HEADER_SIZE =
NODE_TYPE_SIZE + IS_ROOT_SIZE + PARENT_POINTER_SIZE;
除了root节点外,就是叶子节点,叶子节点保存完整的数据信息,所以和root节点的内容有所不同。
/*
* Leaf Node Header Layout
*/
// 叶子节点保存的cell数量,一个cell由key和value组成 可以看作一个key后面跟着持久化的行
const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);
// 叶子节点的cell数量的偏移量
const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;
// 叶子节点的元数据大小
const uint32_t LEAF_NODE_HEADER_SIZE =
COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;
叶子节点保存的cell数量,一个cell由key和value组成 可以看作一个key后面跟着持久化的行。
示意图很清楚说明了第一个字节是node_type,接着一个字节是is_root,后面四个字节是父节点的指针,再下面4个字节为cell的数量,这里面有个错误就是写了两个,剩下内容就是cell,即key+value,都是这样部署的,Node节点不够存cell的就浪费了。
访问叶子节点方法
// 叶子节点中cell数量地址获取
uint32_t* leaf_node_num_cells(void* node) {
return node + LEAF_NODE_NUM_CELLS_OFFSET;
}
// 叶子节点上第cell_num个cell的偏移量的地址
void* leaf_node_cell(void* node, uint32_t cell_num) {
return node + LEAF_NODE_HEADER_SIZE + cell_num * LEAF_NODE_CELL_SIZE;
}
// 叶子节点上第cell_num个key的偏移量,因为cell的前面放的是key
uint32_t* leaf_node_key(void* node, uint32_t cell_num) {
return leaf_node_cell(node, cell_num);
}
// 叶子节点上第cell_num个cell的value地址获取
void* leaf_node_value(void* node, uint32_t cell_num) {
return leaf_node_cell(node, cell_num) + LEAF_NODE_KEY_SIZE;
}
// 初始化一个节点,将cell_num设置为0
void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }
2.2 Table和Pager的改动
首先整个设计考虑简单点,不支持部分页面的读取,每次读取是读取整个页面。
const uint32_t PAGE_SIZE = 4096;
const uint32_t TABLE_MAX_PAGES = 100;
typedef struct {
int file_descriptor;
uint32_t file_length;
+ uint32_t num_pages;
void* pages[TABLE_MAX_PAGES];
} Pager;
typedef struct {
Pager* pager;
- uint32_t num_rows;
+ uint32_t root_page_num;
} Table;
在新的定义中,我们固定了每个表的最大行数。另外我们将page的数量保存在Pager中。在Table中保存根页面的page_num,这样我们就可以通过表方便找到root页面了。
下面是一些关键行数的修改:
void* get_page(Pager* pager, uint32_t page_num) {
pager->pages[page_num] = page;
if (page_num >= pager->num_pages) {
pager->num_pages = page_num + 1;
}
return pager->pages[page_num];
}
///
Pager* pager_open(const char* filename) {
Pager* pager = malloc(sizeof(Pager));
pager->file_descriptor = fd;
pager->file_length = file_length;
pager->num_pages = (file_length / PAGE_SIZE);
if (file_length % PAGE_SIZE != 0) {
printf("Db file is not a whole number of pages. Corrupt file.\n");
exit(EXIT_FAILURE);
}
2.3 游标的更改
游标定位数据,以前通过行来定位,现在通过页面号和cell号来定位数据。
typedef struct {
Table* table;
- uint32_t row_num;
+ uint32_t page_num;
+ uint32_t cell_num;
bool end_of_table; // Indicates a position one past the last element
} Cursor;
游标创建:
Cursor* table_start(Table* table) {
Cursor* cursor = malloc(sizeof(Cursor));
cursor->table = table;
- cursor->row_num = 0;
- cursor->end_of_table = (table->num_rows == 0);
+ cursor->page_num = table->root_page_num;
+ cursor->cell_num = 0;
+
+ void* root_node = get_page(table->pager, table->root_page_num);
+ uint32_t num_cells = *leaf_node_num_cells(root_node);
+ cursor->end_of_table = (num_cells == 0);
return cursor;
}
表开始的游标定位,游标的页面为表的根页面编号,通过leaf_node_num_cells
行数获取root节点中cell的数量。
Cursor* table_end(Table* table) {
Cursor* cursor = malloc(sizeof(Cursor));
cursor->table = table;
- cursor->row_num = table->num_rows;
+ cursor->page_num = table->root_page_num;
+
+ void* root_node = get_page(table->pager, table->root_page_num);
+ uint32_t num_cells = *leaf_node_num_cells(root_node);
+ cursor->cell_num = num_cells;
cursor->end_of_table = true;
return cursor;
}
这个是表的结束页的游标,设置end_of_table的标志。
void* cursor_value(Cursor* cursor) {
- uint32_t row_num = cursor->row_num;
- uint32_t page_num = row_num / ROWS_PER_PAGE;
+ uint32_t page_num = cursor->page_num;
void* page = get_page(cursor->table->pager, page_num);
- uint32_t row_offset = row_num % ROWS_PER_PAGE;
- uint32_t byte_offset = row_offset * ROW_SIZE;
- return page + byte_offset;
+ return leaf_node_value(page, cursor->cell_num);
}
获取游标处的值,获取cell_num的value,由于每个cell的大小一样,所以也是类似的取值方法。游标的递增:
void cursor_advance(Cursor* cursor) {
- cursor->row_num += 1;
- if (cursor->row_num >= cursor->table->num_rows) {
+ uint32_t page_num = cursor->page_num;
+ void* node = get_page(cursor->table->pager, page_num);
+
+ cursor->cell_num += 1;
+ if (cursor->cell_num >= (*leaf_node_num_cells(node))) {
cursor->end_of_table = true;
}
}
这里面每次递增,只增加cell的数量,也许你会说cell递增到最后一个cell怎么办,其实增加到最后到最后一个cell后,设置了表结束的标志,循环退出了。
数据库打开打开数据库,如果是一个新的数据库,初始化一个页面作为叶子节点。
Table* db_open(const char* filename) {
Pager* pager = pager_open(filename);
Table* table = malloc(sizeof(Table));
table->pager = pager;
table->root_page_num = 0;
if (pager->num_pages == 0) {
// New database file. Initialize page 0 as leaf node.
void* root_node = get_page(pager, 0);
initialize_leaf_node(root_node);
}
return table;
}
下面是个关键行数,先叶子节点插入数据:
void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {
void* node = get_page(cursor->table->pager, cursor->page_num);
uint32_t num_cells = *leaf_node_num_cells(node);
if (num_cells >= LEAF_NODE_MAX_CELLS) {
// 节点满了
printf("Need to implement splitting a leaf node.\n");
exit(EXIT_FAILURE);
}
if (cursor->cell_num < num_cells) {
// 为一个cell腾出位置位置
for (uint32_t i = num_cells; i > cursor->cell_num; i--) {
memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),
LEAF_NODE_CELL_SIZE);
}
}
// 增加cell_num,设置key和持久化row。
*(leaf_node_num_cells(node)) += 1;
*(leaf_node_key(node, cursor->cell_num)) = key;
serialize_row(value, leaf_node_value(node, cursor->cell_num));
}
这个函数假设树只有一个页面,目前版本先不支持多个页面。插入操作:
ExecuteResult execute_insert(Statement* statement, Table* table) {
void* node = get_page(table->pager, table->root_page_num);
if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {
return EXECUTE_TABLE_FULL;
}
Row* row_to_insert = &(statement->row_to_insert);
Cursor* cursor = table_end(table);
leaf_node_insert(cursor, row_to_insert->id, row_to_insert);
free(cursor);
}
三 打印命令
打印meta信息即元数据信息。
void print_constants() {
printf("ROW_SIZE: %d\n", ROW_SIZE);
printf("COMMON_NODE_HEADER_SIZE: %d\n", COMMON_NODE_HEADER_SIZE);
printf("LEAF_NODE_HEADER_SIZE: %d\n", LEAF_NODE_HEADER_SIZE);
printf("LEAF_NODE_CELL_SIZE: %d\n", LEAF_NODE_CELL_SIZE);
printf("LEAF_NODE_SPACE_FOR_CELLS: %d\n", LEAF_NODE_SPACE_FOR_CELLS);
printf("LEAF_NODE_MAX_CELLS: %d\n", LEAF_NODE_MAX_CELLS);
}
MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {
if (strcmp(input_buffer->buffer, ".exit") == 0) {
db_close(table);
exit(EXIT_SUCCESS);
} else if (strcmp(input_buffer->buffer, ".constants") == 0) {
printf("Constants:\n");
print_constants();
return META_COMMAND_SUCCESS;
} else {
return META_COMMAND_UNRECOGNIZED_COMMAND;
}
没啥需要特别说明的,只是支持一个打印常量元数据的命令。
四 树的可视化
为了帮助调试,增加打印树的功能:
void print_leaf_node(void* node) {
uint32_t num_cells = *leaf_node_num_cells(node);
printf("leaf (size %d)\n", num_cells);
for (uint32_t i = 0; i < num_cells; i++) {
uint32_t key = *leaf_node_key(node, i);
printf(" - %d : %d\n", i, key);
}
}
遍历节点,打印cell信息。增加meta命令:
MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {
if (strcmp(input_buffer->buffer, ".exit") == 0) {
db_close(table);
exit(EXIT_SUCCESS);
} else if (strcmp(input_buffer->buffer, ".btree") == 0) {
printf("Tree:\n");
print_leaf_node(get_page(table->pager, 0));
return META_COMMAND_SUCCESS;
} else if (strcmp(input_buffer->buffer, ".constants") == 0) {
printf("Constants:\n");
print_constants();
return META_COMMAND_SUCCESS;
} else {
return META_COMMAND_UNRECOGNIZED_COMMAND;
}
这次最大的改动是文件的存储结构改变,改成B+树方式,但是我们并没有对文件里面的cell按照key进行排序,而且我们只支持一个页面,不过仍然是重大的进步,慢慢来吧。