目录结构
注:提前言明 本文借鉴了以下博主、书籍或网站的内容,其列表如下:
1、参考书籍:《PostgreSQL数据库内核分析》
2、参考书籍:《数据库事务处理的艺术:事务管理与并发控制》
3、PostgreSQL数据库仓库链接,点击前往
4、日本著名PostgreSQL数据库专家 铃木启修 网站主页,点击前往
5、参考书籍:《PostgreSQL中文手册》
6、参考书籍:《PostgreSQL指南:内幕探索》,点击前往
7、参考书籍:《事务处理 概念与技术》
1、本文内容全部来源于开源社区 GitHub和以上博主的贡献,本文也免费开源(可能会存在问题,评论区等待大佬们的指正)
2、本文目的:开源共享 抛砖引玉 一起学习
3、本文不提供任何资源 不存在任何交易 与任何组织和机构无关
4、大家可以根据需要自行 复制粘贴以及作为其他个人用途,但是不允许转载 不允许商用 (写作不易,还请见谅 💖)
5、本文内容基于PostgreSQL master源码开发而成
高级数据结构和算法之快速排序算法quicksort的实现及使用
- 文章快速说明索引
- 快速排序内部使用
- 快速排序内部实现
文章快速说明索引
学习目标:
做数据库内核开发久了就会有一种 少年得志,年少轻狂 的错觉,然鹅细细一品觉得自己其实不算特别优秀 远远没有达到自己想要的。也许光鲜的表面掩盖了空洞的内在,每每想到于此,皆有夜半临渊如履薄冰之感。为了睡上几个踏实觉,即日起 暂缓其他基于PostgreSQL数据库的兼容功能开发,近段时间 将着重于学习分享Postgres的基础知识和实践内幕。
学习内容:(详见目录)
1、高级数据结构和算法之快速排序算法quicksort的实现及使用
学习时间:
2024年10月12日 21:01:55
学习产出:
1、PostgreSQL数据库基础知识回顾 1个
2、CSDN 技术博客 1篇
3、PostgreSQL数据库内核深入学习
注:下面我们所有的学习环境是Centos8+PostgreSQL master +Oracle19C+MySQL8.0
postgres=# select version();
version
------------------------------------------------------------------------------------------------------------
PostgreSQL 18devel on x86_64-pc-linux-gnu, compiled by gcc (GCC) 8.5.0 20210514 (Red Hat 8.5.0-21), 64-bit
(1 row)
postgres=#
#-----------------------------------------------------------------------------#
SQL> select * from v$version;
BANNER Oracle Database 19c EE Extreme Perf Release 19.0.0.0.0 - Production
BANNER_FULL Oracle Database 19c EE Extreme Perf Release 19.0.0.0.0 - Production Version 19.17.0.0.0
BANNER_LEGACY Oracle Database 19c EE Extreme Perf Release 19.0.0.0.0 - Production
CON_ID 0
#-----------------------------------------------------------------------------#
mysql> select version();
+-----------+
| version() |
+-----------+
| 8.0.27 |
+-----------+
1 row in set (0.06 sec)
mysql>
快速排序内部使用
不知道大家在日常的学习和工作中,有没有注意到PostgreSQL在内部实际上是使用了一些高级数据结构和算法?例如下面这个简单的执行计划:
postgres=# SET enable_hashjoin = 'off';
SET
postgres=# explain (verbose, costs off, analyze) SELECT * FROM norm_test WHERE x IN (VALUES (1), (29));
QUERY PLAN
---------------------------------------------------------------------------------------------
Nested Loop (actual time=0.184..0.309 rows=97 loops=1)
Output: norm_test.x, norm_test.payload
-> Unique (actual time=0.010..0.014 rows=2 loops=1)
Output: "*VALUES*".column1
-> Sort (actual time=0.009..0.010 rows=2 loops=1)
Output: "*VALUES*".column1
Sort Key: "*VALUES*".column1
Sort Method: quicksort Memory: 25kB
-> Values Scan on "*VALUES*" (actual time=0.002..0.003 rows=2 loops=1)
Output: "*VALUES*".column1
-> Bitmap Heap Scan on public.norm_test (actual time=0.089..0.135 rows=48 loops=2)
Output: norm_test.x, norm_test.payload
Recheck Cond: (norm_test.x = "*VALUES*".column1)
Heap Blocks: exact=10
-> Bitmap Index Scan on norm_test_x_idx (actual time=0.061..0.061 rows=48 loops=2)
Index Cond: (norm_test.x = "*VALUES*".column1)
Planning Time: 0.442 ms
Execution Time: 0.373 ms
(18 rows)
postgres=#
如上,中间出现Sort Method: quicksort
这样的字眼,那么数据库内部是怎么实现以及使用这个快速排序的呢?当时上学的时候,没有少手撕代码,死去的回忆突然间开始攻击我!有兴趣的小伙伴可以看一下本人之前(long long time ago)的博客:
- DSA之十大排序算法第六种:Quick Sort,点击前往
简单看了一眼内核代码,该算法是快速排序的变种算法,该算法是改编至J. L. Bentley and M. D. McIlroy在专刊Software–Practice and Experience发表的名为Engineering a sort function论文,如下:
// src/include/lib/sort_template.h
/*
* Qsort routine based on J. L. Bentley and M. D. McIlroy,
* "Engineering a sort function",
* Software--Practice and Experience 23 (1993) 1249-1265.
*
* We have modified their original by adding a check for already-sorted
* input, which seems to be a win per discussions on pgsql-hackers around
* 2006-03-21.
* 我们修改了原来的版本,增加了对已排序输入的检查,
* 根据 2006-03-21 左右 pgsql-hackers 上的讨论,这似乎是一个胜利。
*
* Also, we recurse on the smaller partition and iterate on the larger one,
* which ensures we cannot recurse more than log(N) levels (since the
* partition recursed to is surely no more than half of the input). Bentley
* and McIlroy explicitly rejected doing this on the grounds that it's "not
* worth the effort", but we have seen crashes in the field due to stack
* overrun, so that judgment seems wrong.
* 此外,我们在较小的分区上进行递归,在较大的分区上进行迭代,
* 这确保了我们不能递归超过 log(N) 级(因为递归到的分区肯定不超过输入的一半)。
*
* Bentley 和 McIlroy 明确拒绝这样做,理由是“不值得付出努力”,
* 但我们在现场看到过由于堆栈溢出而导致的崩溃,因此这种判断似乎是错误的。
*/
在PostgreSQL内核中直接调用非常简单,本来想的是直接调试一个现有的逻辑 但是复杂的调试过程不便于理解学习的重点。我随便写了一个简单的插件,因为比较简单 而且我很懒,就不再仓库上传,这里直接贴源码 如下:
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$ git branch
REL_17_0
* master
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$ ls
Makefile meson.build test_use_qsort--1.0.sql test_use_qsort.c test_use_qsort.control test_use_qsort.o test_use_qsort.so
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$ make clean
rm -f test_use_qsort.so test_use_qsort.o \
test_use_qsort.bc
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$ pwd
/home/postgres/postgres/contrib/test_use_qsort
[postgres@localhost:~/postgres/contrib/test_use_qsort → master]$
/* -------------------------------------------------------------------------
*
* test_use_qsort.c
*
* Copyright (c) 2010-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/test_use_qsort/test_use_qsort.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/hsearch.h"
#include <stdlib.h>
#include <time.h>
PG_FUNCTION_INFO_V1(test_internal_qsort);
PG_MODULE_MAGIC;
typedef struct Student
{
int class_id;
int student_id;
char *name;
bool gender;
} Student;
static inline int student_comparator(const Student *a, const Student *b);
#define ST_SORT sort_student
#define ST_ELEMENT_TYPE Student
#define ST_COMPARE(a, b) student_comparator(a, b)
#define ST_SCOPE static
#define ST_DEFINE
#include <lib/sort_template.h>
typedef struct
{
int class_id;
int student_id;
} StudentKey;
typedef struct StudentEntry
{
StudentKey key;
bool active;
} StudentEntry;
static inline int
student_comparator(const Student *a, const Student *b)
{
/* compare student */
if (a->class_id < b->class_id)
return -1;
else if (a->class_id > b->class_id)
return 1;
if (a->student_id < b->student_id)
return -1;
else if (a->student_id > b->student_id)
return 1;
return 0;
}
void
_PG_init(void)
{
/* other plugins can perform things here */
}
Datum test_internal_qsort(PG_FUNCTION_ARGS)
{
int max_class = 3,
max_student = 20;
Student *students = NULL;
HASHCTL hashCtl;
HTAB *student_hash = NULL;
int stu_num = PG_GETARG_INT32(0);
if(stu_num <= 0 || stu_num > max_class * max_student)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid num")));
}
students = (Student *)palloc0(sizeof(Student) * stu_num);
hashCtl.keysize = sizeof(StudentKey);
hashCtl.entrysize = sizeof(StudentEntry);
student_hash = hash_create("student hash",
stu_num,
&hashCtl,
HASH_ELEM | HASH_BLOBS);
srand((unsigned)time(NULL));
for(int i = 0; i < stu_num; ++i)
{
bool found = false;
StudentKey key = {0, 0};
StudentEntry *entry = NULL;
char student_name[24] = { 0 };
retry:
key.class_id = rand() % max_class + 1;
key.student_id = rand() % max_student + 1;
hash_search(student_hash, &key, HASH_FIND, &found);
if(found)
{
goto retry;
}
else
{
entry = hash_search(student_hash, &key, HASH_ENTER, NULL);
entry->active = true;
}
students[i].class_id = key.class_id;
students[i].student_id = key.student_id;
#define STUDENT_NAME_PREFIX "songbaobao"
sprintf(student_name, "%s_%d_%d", STUDENT_NAME_PREFIX, students[i].class_id, students[i].student_id);
students[i].name = pstrdup(student_name);
students[i].gender = (rand() % 1) == 0 ? false : true;
}
for(int i = 0; i < stu_num; ++i)
{
ereport(NOTICE,
(errmsg("Student %2d class id: %d, student id: %2d, student name: %s",
i + 1, students[i].class_id, students[i].student_id, students[i].name)));
}
ereport(NOTICE,
(errmsg("Sorting, please wait...")));
sort_student(students, stu_num);
ereport(NOTICE,
(errmsg("Sorting End.")));
for(int i = 0; i < stu_num; ++i)
{
ereport(NOTICE,
(errmsg("Student %2d class id: %d, student id: %2d, student name: %s",
i + 1, students[i].class_id, students[i].student_id, students[i].name)));
}
for(int i = 0; i < stu_num; ++i)
{
pfree(students[i].name);
}
pfree(students);
hash_destroy(student_hash);
PG_RETURN_BOOL(true);
}
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_use_qsort" to load this file. \quit
CREATE FUNCTION test_internal_qsort(studentNum int)
RETURNS bool
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
编译安装之后,简单测试如下:
postgres=# select test_internal_qsort(10);
NOTICE: Student 1 class id: 1, student id: 8, student name: songbaobao_1_8
NOTICE: Student 2 class id: 3, student id: 3, student name: songbaobao_3_3
NOTICE: Student 3 class id: 3, student id: 12, student name: songbaobao_3_12
NOTICE: Student 4 class id: 1, student id: 17, student name: songbaobao_1_17
NOTICE: Student 5 class id: 3, student id: 18, student name: songbaobao_3_18
NOTICE: Student 6 class id: 1, student id: 4, student name: songbaobao_1_4
NOTICE: Student 7 class id: 3, student id: 14, student name: songbaobao_3_14
NOTICE: Student 8 class id: 2, student id: 5, student name: songbaobao_2_5
NOTICE: Student 9 class id: 2, student id: 3, student name: songbaobao_2_3
NOTICE: Student 10 class id: 2, student id: 4, student name: songbaobao_2_4
NOTICE: Sorting, please wait...
NOTICE: Sorting End.
NOTICE: Student 1 class id: 1, student id: 4, student name: songbaobao_1_4
NOTICE: Student 2 class id: 1, student id: 8, student name: songbaobao_1_8
NOTICE: Student 3 class id: 1, student id: 17, student name: songbaobao_1_17
NOTICE: Student 4 class id: 2, student id: 3, student name: songbaobao_2_3
NOTICE: Student 5 class id: 2, student id: 4, student name: songbaobao_2_4
NOTICE: Student 6 class id: 2, student id: 5, student name: songbaobao_2_5
NOTICE: Student 7 class id: 3, student id: 3, student name: songbaobao_3_3
NOTICE: Student 8 class id: 3, student id: 12, student name: songbaobao_3_12
NOTICE: Student 9 class id: 3, student id: 14, student name: songbaobao_3_14
NOTICE: Student 10 class id: 3, student id: 18, student name: songbaobao_3_18
test_internal_qsort
---------------------
t
(1 row)
postgres=#
如上函数非常简单,这里不再赘述。核心代码,如下:
...
#define ST_SORT sort_student
#define ST_ELEMENT_TYPE Student
#define ST_COMPARE(a, b) student_comparator(a, b)
#define ST_SCOPE static
#define ST_DEFINE
#include <lib/sort_template.h>
...
快速排序内部实现
接下来,我们就看一下数据库内部算法的核心实现,如下:
// src/include/lib/sort_template.h
*
* Usage notes:
*
* To generate functions specialized for a type, the following parameter
* macros should be #define'd before this file is included.
* 要生成专门针对某种类型的函数,应在包含此文件之前#define 以下参数宏。
*
* - ST_SORT - the name of a sort function to be generated 要生成的排序函数的名称
* - ST_ELEMENT_TYPE - type of the referenced elements 引用元素的类型
* - ST_DECLARE - if defined the functions and types are declared 如果定义了函数和类型则声明
* - ST_DEFINE - if defined the functions and types are defined 如果已定义,则定义了函数和类型
* - ST_SCOPE - scope (e.g. extern, static inline) for functions
* - ST_CHECK_FOR_INTERRUPTS - if defined the sort is interruptible 如果已定义,则排序是可中断的
*
* Instead of ST_ELEMENT_TYPE, ST_ELEMENT_TYPE_VOID can be defined. Then
* the generated functions will automatically gain an "element_size"
* parameter. This allows us to generate a traditional qsort function.
* 可以定义 ST_ELEMENT_TYPE_VOID 来代替 ST_ELEMENT_TYPE。
* 然后生成的函数将自动获得“element_size”参数。
* 这使我们能够生成传统的 qsort 函数。
*
* One of the following macros must be defined, to show how to compare
* elements. The first two options are arbitrary expressions depending
* on whether an extra pass-through argument is desired, and the third
* option should be defined if the sort function should receive a
* function pointer at runtime.
* 必须定义以下宏之一,以显示如何比较元素。
* 前两个选项是任意表达式,具体取决于是否需要额外的传递参数,如果排序函数应在运行时接收函数指针,则应定义第三个选项。
*
* - ST_COMPARE(a, b) - a simple comparison expression
* - ST_COMPARE(a, b, arg) - variant that takes an extra argument 接受额外参数的变体
* - ST_COMPARE_RUNTIME_POINTER - sort function takes a function pointer 排序函数接受一个函数指针
*
* NB: If the comparator function is inlined, some compilers may produce
* worse code with the optimized comparison routines in common/int.h than
* with code with the following form:
*
* if (a < b)
* return -1;
* if (a > b)
* return 1;
* return 0;
*
* To say that the comparator and therefore also sort function should
* receive an extra pass-through argument, specify the type of the
* argument.
* 要说明比较器以及排序函数应该接收额外的传递参数,请指定参数的类型。
*
* - ST_COMPARE_ARG_TYPE - type of extra argument
*
* The prototype of the generated sort function is:
*
* void ST_SORT(ST_ELEMENT_TYPE *data, size_t n,
* [size_t element_size,]
* [ST_SORT_compare_function compare,]
* [ST_COMPARE_ARG_TYPE *arg]);
*
* ST_SORT_compare_function is a function pointer of the following type:
*
* int (*)(const ST_ELEMENT_TYPE *a, const ST_ELEMENT_TYPE *b,
* [ST_COMPARE_ARG_TYPE *arg])
*
如上 宏的设置根据自己的需要而定,就像数据库中其他使用的那样:
// src/backend/utils/sort/tuplesort.c
#define ST_SORT qsort_tuple_unsigned
#define ST_ELEMENT_TYPE SortTuple
#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
#define ST_COMPARE_ARG_TYPE Tuplesortstate
#define ST_CHECK_FOR_INTERRUPTS
#define ST_SCOPE static
#define ST_DEFINE
#include "lib/sort_template.h"
#if SIZEOF_DATUM >= 8
#define ST_SORT qsort_tuple_signed
#define ST_ELEMENT_TYPE SortTuple
#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
#define ST_COMPARE_ARG_TYPE Tuplesortstate
#define ST_CHECK_FOR_INTERRUPTS
#define ST_SCOPE static
#define ST_DEFINE
#include "lib/sort_template.h"
#endif
#define ST_SORT qsort_tuple_int32
#define ST_ELEMENT_TYPE SortTuple
#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
#define ST_COMPARE_ARG_TYPE Tuplesortstate
#define ST_CHECK_FOR_INTERRUPTS
#define ST_SCOPE static
#define ST_DEFINE
#include "lib/sort_template.h"
#define ST_SORT qsort_tuple
#define ST_ELEMENT_TYPE SortTuple
#define ST_COMPARE_RUNTIME_POINTER
#define ST_COMPARE_ARG_TYPE Tuplesortstate
#define ST_CHECK_FOR_INTERRUPTS
#define ST_SCOPE static
#define ST_DECLARE
#define ST_DEFINE
#include "lib/sort_template.h"
#define ST_SORT qsort_ssup
#define ST_ELEMENT_TYPE SortTuple
#define ST_COMPARE(a, b, ssup) \
ApplySortComparator((a)->datum1, (a)->isnull1, \
(b)->datum1, (b)->isnull1, (ssup))
#define ST_COMPARE_ARG_TYPE SortSupportData
#define ST_CHECK_FOR_INTERRUPTS
#define ST_SCOPE static
#define ST_DEFINE
#include "lib/sort_template.h"
接下来,看一下今天的重中之重ST_SORT
函数,如下:
/*
* Sort an array.
*/
ST_SCOPE void
ST_SORT(ST_ELEMENT_TYPE * data, size_t n
ST_SORT_PROTO_ELEMENT_SIZE
ST_SORT_PROTO_COMPARE
ST_SORT_PROTO_ARG)
{
ST_POINTER_TYPE *a = (ST_POINTER_TYPE *) data,
*pa,
*pb,
*pc,
*pd,
*pl,
*pm,
*pn;
size_t d1,
d2;
int r,
presorted;
loop:
DO_CHECK_FOR_INTERRUPTS();
if (n < 7)
{
for (pm = a + ST_POINTER_STEP; pm < a + n * ST_POINTER_STEP;
pm += ST_POINTER_STEP)
for (pl = pm; pl > a && DO_COMPARE(pl - ST_POINTER_STEP, pl) > 0;
pl -= ST_POINTER_STEP)
DO_SWAP(pl, pl - ST_POINTER_STEP);
return;
}
presorted = 1;
for (pm = a + ST_POINTER_STEP; pm < a + n * ST_POINTER_STEP;
pm += ST_POINTER_STEP)
{
DO_CHECK_FOR_INTERRUPTS();
if (DO_COMPARE(pm - ST_POINTER_STEP, pm) > 0)
{
presorted = 0;
break;
}
}
if (presorted)
return;
pm = a + (n / 2) * ST_POINTER_STEP;
if (n > 7)
{
pl = a;
pn = a + (n - 1) * ST_POINTER_STEP;
if (n > 40)
{
size_t d = (n / 8) * ST_POINTER_STEP;
pl = DO_MED3(pl, pl + d, pl + 2 * d);
pm = DO_MED3(pm - d, pm, pm + d);
pn = DO_MED3(pn - 2 * d, pn - d, pn);
}
pm = DO_MED3(pl, pm, pn);
}
DO_SWAP(a, pm);
pa = pb = a + ST_POINTER_STEP;
pc = pd = a + (n - 1) * ST_POINTER_STEP;
for (;;)
{
while (pb <= pc && (r = DO_COMPARE(pb, a)) <= 0)
{
if (r == 0)
{
DO_SWAP(pa, pb);
pa += ST_POINTER_STEP;
}
pb += ST_POINTER_STEP;
DO_CHECK_FOR_INTERRUPTS();
}
while (pb <= pc && (r = DO_COMPARE(pc, a)) >= 0)
{
if (r == 0)
{
DO_SWAP(pc, pd);
pd -= ST_POINTER_STEP;
}
pc -= ST_POINTER_STEP;
DO_CHECK_FOR_INTERRUPTS();
}
if (pb > pc)
break;
DO_SWAP(pb, pc);
pb += ST_POINTER_STEP;
pc -= ST_POINTER_STEP;
}
pn = a + n * ST_POINTER_STEP;
d1 = Min(pa - a, pb - pa);
DO_SWAPN(a, pb - d1, d1);
d1 = Min(pd - pc, pn - pd - ST_POINTER_STEP);
DO_SWAPN(pb, pn - d1, d1);
d1 = pb - pa;
d2 = pd - pc;
if (d1 <= d2)
{
/* Recurse on left partition, then iterate on right partition */
if (d1 > ST_POINTER_STEP)
DO_SORT(a, d1 / ST_POINTER_STEP);
if (d2 > ST_POINTER_STEP)
{
/* Iterate rather than recurse to save stack space */
/* DO_SORT(pn - d2, d2 / ST_POINTER_STEP) */
a = pn - d2;
n = d2 / ST_POINTER_STEP;
goto loop;
}
}
else
{
/* Recurse on right partition, then iterate on left partition */
if (d2 > ST_POINTER_STEP)
DO_SORT(pn - d2, d2 / ST_POINTER_STEP);
if (d1 > ST_POINTER_STEP)
{
/* Iterate rather than recurse to save stack space */
/* DO_SORT(a, d1 / ST_POINTER_STEP) */
n = d1 / ST_POINTER_STEP;
goto loop;
}
}
}
第一部分:小数组的处理:对于小于7个元素的数组,函数使用插入排序,这是一种时间复杂度为O(n²)的简单算法,但在小数据集上的表现优于快速排序。这通过以下代码实现:
if (n < 7)
{
for (pm = a + ST_POINTER_STEP; pm < a + n * ST_POINTER_STEP;
pm += ST_POINTER_STEP)
for (pl = pm; pl > a && DO_COMPARE(pl - ST_POINTER_STEP, pl) > 0;
pl -= ST_POINTER_STEP)
DO_SWAP(pl, pl - ST_POINTER_STEP);
return;
}
第二部分:验证是否已经排过序,已然有序就可以直接返回了:
presorted = 1;
for (pm = a + ST_POINTER_STEP; pm < a + n * ST_POINTER_STEP;
pm += ST_POINTER_STEP)
{
DO_CHECK_FOR_INTERRUPTS();
if (DO_COMPARE(pm - ST_POINTER_STEP, pm) > 0)
{
presorted = 0;
break;
}
}
if (presorted)
return;
第三部分:array size大于7,使用快排:首先求取基准数,尤其是当数组大小超过40,为了更好取样,将划分8个为一组,求取median:
pm = a + (n / 2) * ST_POINTER_STEP;
if (n > 7)
{
pl = a;
pn = a + (n - 1) * ST_POINTER_STEP;
if (n > 40)
{
size_t d = (n / 8) * ST_POINTER_STEP;
pl = DO_MED3(pl, pl + d, pl + 2 * d);
pm = DO_MED3(pm - d, pm, pm + d);
pn = DO_MED3(pn - 2 * d, pn - d, pn);
}
pm = DO_MED3(pl, pm, pn);
}
这里的DO_MED3
目的:找到3个数的中间数(靠的是compare),如下:
/*
* Find the median of three values. Currently, performance seems to be best
* if the comparator is inlined here, but the med3 function is not inlined
* in the qsort function.
* 找到三个值的中位数。
* 目前,如果比较器在此处内联,性能似乎最佳,但 med3 函数未在 qsort 函数中内联。
*
* Refer to the comment at the top of this file for known caveats to consider
* when writing inlined comparator functions.
*/
static pg_noinline ST_ELEMENT_TYPE *
ST_MED3(ST_ELEMENT_TYPE * a,
ST_ELEMENT_TYPE * b,
ST_ELEMENT_TYPE * c
ST_SORT_PROTO_COMPARE
ST_SORT_PROTO_ARG)
{
return DO_COMPARE(a, b) < 0 ?
(DO_COMPARE(b, c) < 0 ? b : (DO_COMPARE(a, c) < 0 ? c : a))
: (DO_COMPARE(b, c) > 0 ? b : (DO_COMPARE(a, c) < 0 ? a : c));
}
然后正式进入排序的过程,首先 pa,pb 指向第二个元素, pc,pd指向尾部;而基准数则是放在第一位:
DO_SWAP(a, pm);
pa = pb = a + ST_POINTER_STEP;
pc = pd = a + (n - 1) * ST_POINTER_STEP;
for (;;)
{
while (pb <= pc && (r = DO_COMPARE(pb, a)) <= 0)
{
if (r == 0)
{
DO_SWAP(pa, pb);
pa += ST_POINTER_STEP;
}
pb += ST_POINTER_STEP;
DO_CHECK_FOR_INTERRUPTS();
}
while (pb <= pc && (r = DO_COMPARE(pc, a)) >= 0)
{
if (r == 0)
{
DO_SWAP(pc, pd);
pd -= ST_POINTER_STEP;
}
pc -= ST_POINTER_STEP;
DO_CHECK_FOR_INTERRUPTS();
}
if (pb > pc)
break;
DO_SWAP(pb, pc);
pb += ST_POINTER_STEP;
pc -= ST_POINTER_STEP;
}
- 上面第一个while循环是从头开始筛选出与基准数相等和小于基准数的值,如果相等则从头向尾排列
- 注意基准数
也就是a
都是在和它进行比较 - 而第二个循环则是从尾开始筛选出与基准数相等和大于基准数的值,如果相等则从尾向头排列
下面是一轮排序完后的可能图,头和尾都是与基准数相等的数值,再是小于和大于的数值,中间是还未比较的数,因为pb大于基准数,pc小于基准数,会导致循环退出,此时将两者交换进行下轮比较。
而排序完后的示意图,如下:
第四部分:这里还需要对相等数据进行合并,如下:
pn = a + n * ST_POINTER_STEP;
d1 = Min(pa - a, pb - pa);
DO_SWAPN(a, pb - d1, d1); // 判断相等数据量 和 小于的数据量,实际只要移动数据量少的一方
d1 = Min(pd - pc, pn - pd - ST_POINTER_STEP);
DO_SWAPN(pb, pn - d1, d1); // 判断相等数据量 和 大于的数据量,实际只要移动数据量少的一方
d1 = pb - pa; // 小于区的长度 乱序
d2 = pd - pc; // 大于区的长度 乱序
如上,合并完,中间部分是与基准数相等的,左边是小于基准数的,右边是大于基准数的。
第五部分:接下来可以对两边不等的数据递归进行减而治之,PostgreSQL并没有简单递归,而是对数据量小的部分递归,大的部分进行迭代(原因上面已经说了)。
if (d1 <= d2)
{
/* Recurse on left partition, then iterate on right partition */
// 在左分区上递归,然后在右分区上迭代
if (d1 > ST_POINTER_STEP)
DO_SORT(a, d1 / ST_POINTER_STEP);
if (d2 > ST_POINTER_STEP)
{
/* Iterate rather than recurse to save stack space */
// 迭代而不是递归以节省堆栈空间
/* DO_SORT(pn - d2, d2 / ST_POINTER_STEP) */
a = pn - d2;
n = d2 / ST_POINTER_STEP;
goto loop;
}
}
else
{
/* Recurse on right partition, then iterate on left partition */
if (d2 > ST_POINTER_STEP)
DO_SORT(pn - d2, d2 / ST_POINTER_STEP);
if (d1 > ST_POINTER_STEP)
{
/* Iterate rather than recurse to save stack space */
// 在右分区上递归,然后在左分区上迭代
/* DO_SORT(a, d1 / ST_POINTER_STEP) */
n = d1 / ST_POINTER_STEP;
goto loop;
}
}
至于其他的后面有机会再深入学习!