深入理解 PostgreSQL 中的 tuplesort_performsort 函数
- 函数概述
- 函数源码
- 函数签名
- 核心功能
- 相关函数简介
- 代码结构与逻辑分析
- 1. 内存上下文切换
- 2. 调试跟踪(可选)
- 3. 状态机逻辑(switch 分支)
- 4. 调试跟踪(完成时)
- 5. 恢复内存上下文
- 关键特性与优化
- 1. 状态机设计
- 2. 状态转换图
- 3. 混合排序策略
- 4. 资源管理哲学
- 5. 异常处理机制
- 相关函数介绍
- tuplesort_sort_memtuples 函数
- 优化策略与特性
- 应用场景
- inittapes 函数
- dumptuples 函数
- leader_takeover_tapes 函数
- mergeruns 函数
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书
在 PostgreSQL
的元组排序(tuplesort
)模块中,tuplesort_performsort
是一个核心函数,负责执行排序的主要逻辑。它根据当前的排序状态(state->status
)选择不同的排序策略,处理串行和并行排序场景,并确保内存管理的高效性。本文将详细分析该函数的实现细节,探讨其作用、逻辑结构、状态转换以及相关优化策略,以帮助读者更好地理解 PostgreSQL
的排序机制。其中,tuplesort_performsort
函数的源码流程可见【PostgreSQL内核学习 —— (sort算子)】。
函数概述
函数源码
/*
* 执行排序的主入口函数,处理不同排序阶段的逻辑
*/
void
tuplesort_performsort(Tuplesortstate *state)
{
// 切换到排序专用的内存上下文,确保内存分配在正确环境中进行
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT // 调试跟踪模块
if (trace_sort)
elog(LOG, "performsort of worker %d starting: %s",
state->worker, pg_rusage_show(&state->ru_start)); // 记录排序开始时的资源使用情况
#endif
// 根据当前排序状态选择处理逻辑
switch (state->status)
{
case TSS_INITIAL: // 初始状态(未开始排序)
if (SERIAL(state)) { // 串行模式
/* 单线程快速排序 */
tuplesort_sort_memtuples(state); // 对内存中的元组数组执行快速排序
state->status = TSS_SORTEDINMEM; // 更新状态为"内存已排序"
} else if (WORKER(state)) { // 并行工作线程模式
/*
* 并行Worker将数据写入磁带:
* 1. 初始化磁带存储(inittapes)
* 2. 将内存数据转储到磁带(dumptuples)
* 3. 标记不需要合并运行(worker_nomergeruns)
*/
inittapes(state, false); // 初始化磁带文件
dumptuples(state, true); // 将内存元组写入磁带
worker_nomergeruns(state); // 设置Worker不参与合并
state->status = TSS_SORTEDONTAPE; // 更新状态为"磁带已排序"
} else { // 并行Leader模式
/*
* Leader接管Worker的磁带并执行多路归并:
* 1. 接管磁带资源(leader_takeover_tapes)
* 2. 执行多路归并排序(mergeruns)
*/
leader_takeover_tapes(state); // 接管Worker的磁带资源
mergeruns(state); // 执行多路归并排序
}
// 重置迭代器状态
state->current = 0; // 重置当前读取位置
state->eof_reached = false; // 清除EOF标记
state->markpos_block = 0L; // 重置标记位置块号
state->markpos_offset = 0; // 重置标记位置偏移量
state->markpos_eof = false; // 清除标记EOF状态
break;
case TSS_BOUNDED: // 有界堆排序状态
/*
* 转换有界堆为有序数组:
* 1. 调用sort_bounded_heap处理堆结构
* 2. 更新状态为"内存已排序"
*/
sort_bounded_heap(state); // 将有界堆转换为有序数组
state->current = 0;
state->eof_reached = false;
state->markpos_offset = 0;
state->markpos_eof = false;
state->status = TSS_SORTEDINMEM; // 更新状态
break;
case TSS_BUILDRUNS: // 磁带构建运行状态
/*
* 完成磁带排序:
* 1. 将内存剩余元组写入磁带(dumptuples)
* 2. 执行多路归并(mergeruns)
*/
dumptuples(state, true); // 最终转储内存数据
mergeruns(state); // 执行磁带合并
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
break;
default: // 无效状态处理
elog(ERROR, "invalid tuplesort state"); // 抛出错误
break;
}
#ifdef TRACE_SORT // 调试跟踪模块
if (trace_sort) {
if (state->status == TSS_FINALMERGE) // 最终合并阶段日志
elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));
else // 常规完成日志
elog(LOG, "performsort of worker %d done: %s",
state->worker, pg_rusage_show(&state->ru_start));
}
#endif
// 恢复原始内存上下文
MemoryContextSwitchTo(oldcontext);
}
函数签名
void tuplesort_performsort(Tuplesortstate *state);
- 参数:
Tuplesortstate *state
是一个指向排序状态结构的指针,包含了排序过程中的所有关键信息,如当前状态、内存数据、磁带文件等。 - 作用:作为排序执行的入口点,
tuplesort_performsort
根据状态机的设计,处理不同的排序阶段,包括内存排序、磁带写入和多路归并等。
核心功能
- 根据
state->status
的值,决定是执行内存快速排序、将数据写入磁带,还是进行多路归并。 - 支持串行模式和并行模式(包括
Worker
和Leader
角色)。 - 管理内存上下文,确保资源分配和释放的正确性。
- 重置迭代器状态,为后续读取操作做好准备。
相关函数简介
tuplesort_sort_memtuples
:内存中的快速排序实现。inittapes
:初始化磁带文件结构。dumptuples
:将内存元组写入磁带。worker_nomergeruns
:标记Worker
无需参与合并。leader_takeover_tapes
:Leader
接管Worker
的磁带资源。mergeruns
:执行多路归并排序。sort_bounded_heap
:将有界堆转换为有序数组。
这些函数共同构成了 tuplesort
模块的功能链。
代码结构与逻辑分析
以下是 tuplesort_performsort
的完整代码分解与分析:
1. 内存上下文切换
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
- 目的:将当前内存上下文切换到排序专用的
state->sortcontext
,确保排序过程中分配的内存都在这个上下文中,便于统一管理和释放。 - 重要性:
PostgreSQL
使用内存上下文来隔离不同操作的内存分配,防止内存泄漏或冲突。
在函数结束时,会通过 MemoryContextSwitchTo(oldcontext)
恢复原始上下文。
2. 调试跟踪(可选)
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "performsort of worker %d starting: %s",
state->worker, pg_rusage_show(&state->ru_start));
#endif
- 作用:如果启用了
TRACE_SORT
宏,记录排序开始时的资源使用情况(如CPU
和内存)。 - 工具:
pg_rusage_show
用于显示资源使用统计,便于性能分析和调试。
类似的调试日志也会在排序完成后记录,区分是否处于最终合并阶段。
3. 状态机逻辑(switch 分支)
tuplesort_performsort
的核心逻辑基于 state->status
的值,通过 switch-case
结构分支执行:
TSS_INITIAL:初始状态
case TSS_INITIAL:
if (SERIAL(state)) { // 串行模式
tuplesort_sort_memtuples(state); // 内存快速排序
state->status = TSS_SORTEDINMEM; // 更新状态
} else if (WORKER(state)) { // 并行 Worker 模式
inittapes(state, false); // 初始化磁带
dumptuples(state, true); // 写入磁带
worker_nomergeruns(state); // 标记无需合并
state->status = TSS_SORTEDONTAPE; // 更新状态
} else { // 并行 Leader 模式
leader_takeover_tapes(state); // 接管磁带
mergeruns(state); // 多路归并
}
// 重置迭代器状态
state->current = 0;
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
break;
串行模式 (SERIAL(state)
):
调用 tuplesort_sort_memtuples 对内存中的元组数组执行快速排序。
状态更新为 TSS_SORTEDINMEM,表示内存已排序完成。
并行 Worker
模式 (WORKER(state)
):
初始化磁带文件(inittapes)。
将内存中的元组写入磁带(dumptuples)。
调用 worker_nomergeruns 标记 Worker 不参与后续合并。
状态更新为 TSS_SORTEDONTAPE,表示数据已排序并存储在磁带上。
并行 Leader
模式:
调用 leader_takeover_tapes 接管 Worker 的磁带资源。
执行 mergeruns 进行多路归并排序。
迭代器重置:无论哪种模式,都会重置读取相关的状态变量(如 current
和 eof_reached
)。
TSS_BOUNDED:有界堆排序状态
case TSS_BOUNDED:
sort_bounded_heap(state); // 转换堆为有序数组
state->current = 0;
state->eof_reached = false;
state->markpos_offset = 0;
state->markpos_eof = false;
state->status = TSS_SORTEDINMEM; // 更新状态
break;
-
逻辑:
- 调用
sort_bounded_heap
将有界堆转换为有序数组。 - 更新状态为
TSS_SORTEDINMEM
。
- 调用
-
适用场景:用于需要限制内存使用的排序(如
Top-N
查询)。
TSS_BUILDRUNS:构建运行状态
case TSS_BUILDRUNS:
dumptuples(state, true); // 写入剩余元组
mergeruns(state); // 多路归并
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
break;
-
逻辑:
- 将内存中剩余的元组写入磁带(
dumptuples
)。 - 执行多路归并(
mergeruns
)完成排序。
- 将内存中剩余的元组写入磁带(
-
适用场景:处理大数据集时,内存不足以容纳所有元组,需分阶段写入磁带。
默认分支:无效状态
default:
elog(ERROR, "invalid tuplesort state");
break;
- 作用:如果
state->status
不匹配任何已知状态,抛出错误,确保程序健壮性。
4. 调试跟踪(完成时)
#ifdef TRACE_SORT
if (trace_sort) {
if (state->status == TSS_FINALMERGE)
elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));
else
elog(LOG, "performsort of worker %d done: %s",
state->worker, pg_rusage_show(&state->ru_start));
}
#endif
- 作用:记录排序完成时的资源使用情况,特别区分是否处于最终合并阶段(
TSS_FINALMERGE
)。
5. 恢复内存上下文
MemoryContextSwitchTo(oldcontext);
- 目的:恢复调用前的内存上下文,确保不影响外部环境。
关键特性与优化
1. 状态机设计
Tuplesortstate
中的 status
字段定义了排序的不同阶段:
TSS_INITIAL
:初始状态,未开始排序。TSS_BOUNDED
:有界堆排序状态,用于内存受限场景。TSS_BUILDRUNS
:构建运行状态,处理大数据集。TSS_SORTEDINMEM
:内存已排序完成。TSS_SORTEDONTAPE
:磁带已排序完成。TSS_FINALMERGE
:最终合并阶段(调试日志中提及)。
这种状态机设计使得函数逻辑清晰,能够灵活应对不同场景。
2. 状态转换图
以下是 tuplesort_performsort
中状态转换的简化流程:
[TSS_INITIAL]
├─SERIAL → 快速排序 → [TSS_SORTEDINMEM]
├─WORKER → 磁带写入 → [TSS_SORTEDONTAPE]
└─LEADER → 多路归并 → [TSS_FINALMERGE]
TSS_BOUNDED → TSS_SORTEDINMEM
TSS_BUILDRUNS → (完成合并后,可能进入 TSS_SORTEDONTAPE 或 TSS_FINALMERGE)
3. 混合排序策略
根据数据规模动态选择算法:
数据量 < work_mem → 快速排序
work_mem < 数据量 < 10*work_mem → 堆排序
数据量 > 10*work_mem → 多阶段归并
4. 资源管理哲学
- 内存优先:最大限度利用可用内存
- 渐进式落盘:按需写入临时文件
- 并行流水线:计算与
I/O
重叠
5. 异常处理机制
- 内存超限:触发
"soft"
错误可恢复 - 磁盘空间不足:硬错误终止
- 并行协调:通过共享内存检测
Worker
状态
相关函数介绍
tuplesort_sort_memtuples 函数
本函数是PostgreSQL
内存排序的核心实现,根据不同的数据类型特征选择最优化的快速排序策略。主要执行流程为:
tuplesort_sort_memtuples
是 PostgreSQL tuplesort
机制中的核心函数之一,负责对 memtuples
数组中的元组执行快速排序 (quicksort
)。首先,它会检查 LEADER(state)
以确保当前不是 Leader
线程,然后判断 memtupcount
是否大于 1
,如果只有一个或没有元组,则无需排序,直接返回。
接下来,它会根据数据特征选择最优的排序策略。如果 haveDatum1
标志为 true
,且 sortKeys
存在,表示可以使用专门优化的比较器。针对不同的 comparator
,例如 ssup_datum_unsigned_cmp
、ssup_datum_signed_cmp
(仅适用于 64
位系统)或 ssup_datum_int32_cmp
,分别调用 qsort_tuple_unsigned
、qsort_tuple_signed
或 qsort_tuple_int32
进行排序,以提升性能。
如果不能使用这些优化路径,则会检查 onlyKey
是否非空,若为 true
,说明只涉及单列排序,调用 qsort_ssup
进行排序;否则,调用 qsort_tuple
,使用 comparetup
作为比较函数,执行多列排序。这样,tuplesort_sort_memtuples
在不同数据类型和排序需求下,都能高效地选择最优排序方式,提高整体查询性能。
函数源码如下:
/*
* 对内存中的元组数组进行快速排序
* 根据排序键类型选择最优化的排序实现
*/
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{
/* 断言确认当前不是并行排序的Leader进程 */
Assert(!LEADER(state));
/* 仅当内存元组数量>1时才需要排序 */
if (state->memtupcount > 1)
{
/*
* 检查是否存在可优化的排序条件:
* 1. 首列数据存储在datum1字段中
* 2. 存在有效的排序键定义
*/
if (state->base.haveDatum1 && state->base.sortKeys)
{
/* 针对无符号整型比较器的优化路径 */
if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
{
/* 调用无符号整型特化快速排序 */
qsort_tuple_unsigned(state->memtuples,
state->memtupcount,
state);
return; // 优化路径直接返回
}
#if SIZEOF_DATUM >= 8 // 64位系统专属优化
/* 针对有符号整型比较器的优化路径 */
else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
{
/* 调用有符号整型特化快速排序 */
qsort_tuple_signed(state->memtuples,
state->memtupcount,
state);
return;
}
#endif
/* 针对32位整型比较器的优化路径 */
else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
{
/* 调用32位整型特化快速排序 */
qsort_tuple_int32(state->memtuples,
state->memtupcount,
state);
return;
}
}
/* 单排序键的通用处理路径 */
if (state->base.onlyKey != NULL)
{
/* 使用标准单键快速排序实现 */
qsort_ssup(state->memtuples, state->memtupcount,
state->base.onlyKey);
}
/* 多键/复杂排序场景 */
else
{
/* 使用元组通用快速排序实现 */
qsort_tuple(state->memtuples,
state->memtupcount,
state->base.comparetup, // 自定义元组比较函数
state); // 传递状态上下文
}
}
}
优化策略与特性
- 数据类型特化: 通过识别排序键的数据类型(如无符号整数、有符号整数、32 位整数),选择专用
qsort
变体,提升性能。 - 单键优化: 单键场景下使用
qsort_ssup
,避免多键比较的复杂性。 - 条件编译:
#if SIZEOF_DATUM >= 8
确保有符号整数优化仅在64
位系统启用,增强兼容性。 - 惰性执行: 仅在元组数量大于 1 时排序,减少无用操作。
应用场景
- 内存中小数据集: 快速完成排序。
- 外部排序: 在外部排序的运行中,对内存中的元组进行预排序后写入磁带。
- 并行排序:
Worker
线程在提交结果前对本地元组排序。
inittapes 函数
inittapes
函数是 PostgreSQL
元组排序模块的核心函数之一,专门用于在内存不足以完成排序时,初始化外部磁带排序的环境。
它首先检查 mergeruns
参数,判断是否需要合并多个排序运行(runs
)。如果需要,则根据可用内存调用 tuplesort_merge_order
计算最优的输入磁带数量,以优化归并性能;如果不需要(通常发生在 Worker
线程中),则将磁带数量设为最小值,以简化处理流程。
接着,函数会记录切换到外部排序的日志(若启用了调试跟踪),并调用 inittapestate
和 LogicalTapeSetCreate
来创建和初始化逻辑磁带集。该逻辑磁带集可以支持与共享文件集关联,使得并行排序任务能够协调进行。
随后,inittapes
还会初始化运行编号、输入和输出磁带数组,以及相关的计数器等状态变量,确保磁带资源管理井然有序。
最后,函数将排序状态更新为 TSS_BUILDRUNS
,并选择一个新的磁带,以准备数据写入。通过合理分配磁带资源并设置必要的状态,inittapes
为后续的大规模外部排序奠定了坚实基础,确保 PostgreSQL
在处理超出内存容量的大规模数据排序时依然能够高效运行。
函数源码如下:
/*
* inittapes - initialize for tape sorting.
* 初始化磁带排序的函数。
*
* This is called only if we have found we won't sort in memory.
* 仅在确定无法在内存中完成排序时调用此函数。
*/
static void
inittapes(Tuplesortstate *state, bool mergeruns) // 定义静态函数 inittapes,参数包括排序状态 state 和是否需要合并运行的布尔值 mergeruns
{
Assert(!LEADER(state)); // 断言当前线程不是 Leader 线程,因为 Leader 线程通常不直接处理磁带初始化,而是协调其他线程
if (mergeruns) // 如果 mergeruns 为真,表示需要进行多运行的合并排序
{
/* Compute number of input tapes to use when merging */
/* 计算在合并时需要使用的输入磁带数量 */
state->maxTapes = tuplesort_merge_order(state->allowedMem); // 调用 tuplesort_merge_order 函数,根据允许的内存大小计算合适的输入磁带数量
}
else // 如果 mergeruns 为假,表示无需合并,通常发生在 Worker 线程生成单个运行的情况
{
/* Workers can sometimes produce single run, output without merge */
/* Worker 线程有时可以生成单个运行并直接输出,无需合并 */
Assert(WORKER(state)); // 断言当前线程是 Worker 线程,确保逻辑正确
state->maxTapes = MINORDER; // 将磁带数量设置为最小值 MINORDER(通常为 1),因为只需处理单个运行
}
if (trace_sort) // 如果启用了排序跟踪功能(trace_sort 为真)
elog(LOG, "worker %d switching to external sort with %d tapes: %s", // 调用 elog 记录日志
state->worker, state->maxTapes, pg_rusage_show(&state->ru_start)); // 日志内容包括 Worker ID、使用磁带数量及资源使用情况
/* Create the tape set */
/* 创建磁带集 */
inittapestate(state, state->maxTapes); // 调用 inittapestate 函数初始化磁带状态,准备管理磁带文件和资源
state->tapeset = // 为状态中的 tapeset 赋值,创建逻辑磁带集
LogicalTapeSetCreate(false, // 调用 LogicalTapeSetCreate 创建逻辑磁带集,第一个参数为 false 表示不使用随机访问
state->shared ? &state->shared->fileset : NULL, // 如果 state->shared 存在,则使用共享文件集,否则为 NULL
state->worker); // 传递 Worker ID,用于标识该磁带集的所有者
state->currentRun = 0; // 初始化当前运行编号为 0,表示从第一个运行开始
/*
* Initialize logical tape arrays.
* 初始化逻辑磁带数组。
*/
state->inputTapes = NULL; // 将输入磁带数组初始化为空,表示当前没有输入磁带
state->nInputTapes = 0; // 初始化输入磁带数量为 0
state->nInputRuns = 0; // 初始化输入运行数量为 0
state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *)); // 使用 palloc0 分配输出磁带数组内存,大小为 maxTapes 个指针,并初始化为 0
state->nOutputTapes = 0; // 初始化输出磁带数量为 0
state->nOutputRuns = 0; // 初始化输出运行数量为 0
state->status = TSS_BUILDRUNS; // 更新排序状态为 TSS_BUILDRUNS,表示当前处于构建运行的阶段
selectnewtape(state); // 调用 selectnewtape 函数选择一个新的磁带,用于后续元组的写入操作
}
dumptuples 函数
该函数是PostgreSQL
外部排序的核心例程,负责将内存中的排序结果持久化到磁带文件(实际为临时文件),形成可用于多路归并的初始有序段。主要实现以下关键功能:
1. 内存管控逻辑
- 动态决策转储时机:通过
LACKMEM
判断内存压力,仅在内存不足或强制模式时触发转储 - 空转储预防机制:避免生成无效空运行段(
Worker
进程例外) - 运行次数安全墙:防止
INT_MAX
溢出导致逻辑错误
2. 磁带管理
- 多磁带轮转策略:通过
selectnewtape
实现Round-robin
磁带选择 - 运行段边界标记:
markrunend
写入特殊标识区分不同数据段 - 并行
Worker
支持:维护独立的运行计数器及磁带分配
3. 排序与持久化
- 内存排序优化:调用特化的
tuplesort_sort_memtuples
进行快速排序 - 批量写入优化:采用顺序写模式最大化
I/O
效率 - 元组内存管理:重置内存上下文避免碎片,精确统计内存使用
4. 资源监控与调试
- 资源使用跟踪:通过
pg_rusage_show
记录CPU/内存/I/O数据
- 多级日志输出:记录排序开始、结束及磁带写入的关键节点
- 运行编号追踪:维护
currentRun
实现多阶段运行管理
该过程在外部排序中循环执行,直至所有数据处理完成,形成多个有序的磁带运行段,为后续的多阶段归并排序奠定基础。
函数源码如下:
/*
* 将内存中的元组转储到磁带,形成初始归并段
*
* @param state 排序状态机
* @param alltuples 是否强制转储所有内存元组
* (true表示输入数据已结束,必须全部转储)
*/
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
int memtupwrite; // 待写入的元组总数
int i; // 循环索引
/*
* 内存未耗尽且非强制转储时提前返回:
* 1. 内存元组未达容量上限
* 2. 仍有可用内存空间
* 3. 非最终强制转储模式
*/
if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
!alltuples)
return;
/*
* 处理空转储的特殊情况:
* 1. 内存中无待处理元组
* 2. 当前运行轮次>0(避免创建全空初始运行)
* 例外:Worker进程必须生成至少一个磁带
*/
if (state->memtupcount == 0 && state->currentRun > 0)
return;
// 验证当前处于构建运行阶段
Assert(state->status == TSS_BUILDRUNS);
/*
* 运行次数安全校验:
* 防止currentRun溢出INT_MAX导致不可预测行为
* 实际场景几乎不可能触发,但作为防御性编程措施
*/
if (state->currentRun == INT_MAX)
ereport(ERROR, // 抛出致命错误
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than %d runs for an external sort",
INT_MAX)));
/* 新运行轮次需要切换输出磁带 */
if (state->currentRun > 0)
selectnewtape(state); // 选择新的磁带进行写入
state->currentRun++; // 递增运行计数器
/* 调试日志:记录排序开始信息 */
if (trace_sort)
elog(LOG, "worker %d starting quicksort of run %d: %s",
state->worker, state->currentRun,
pg_rusage_show(&state->ru_start)); // 显示资源使用情况
/*
* 核心排序操作:
* 对当前内存中的元组执行快速排序
* 使用特化排序函数提升性能
*/
tuplesort_sort_memtuples(state);
/* 调试日志:记录排序完成信息 */
if (trace_sort)
elog(LOG, "worker %d finished quicksort of run %d: %s",
state->worker, state->currentRun,
pg_rusage_show(&state->ru_start));
/* 准备写入操作 */
memtupwrite = state->memtupcount; // 获取待写入元组总数
for (i = 0; i < memtupwrite; i++) // 遍历排序后的元组
{
SortTuple *stup = &state->memtuples[i]; // 获取当前元组指针
WRITETUP(state, state->destTape, stup); // 将元组写入目标磁带
}
state->memtupcount = 0; // 重置内存元组计数器
/*
* 重置元组内存上下文:
* 1. 释放所有已分配的元组内存
* 2. 防止内存碎片化,特别是元组大小差异较大时
* 3. 对于有界排序,避免AllocSetFree按大小分类导致的碎片
*/
MemoryContextReset(state->base.tuplecontext);
/*
* 更新内存统计:
* 1. 释放元组占用的内存配额
* 2. 重置内存使用计数器
*/
FREEMEM(state, state->tupleMem);
state->tupleMem = 0;
/* 标记当前磁带运行结束 */
markrunend(state->destTape);
/* 调试日志:记录磁带写入完成 */
if (trace_sort)
elog(LOG, "worker %d finished writing run %d to tape %d: %s",
state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
pg_rusage_show(&state->ru_start));
}
leader_takeover_tapes 函数
本函数是PostgreSQL
并行外部排序的核心协调函数,主要实现Leader
进程对Worker
排序结果的统一接管和归并准备。其核心工作流程可分为四个阶段:
- 状态安全校验
- 通过双断言确保执行环境正确(
Leader
身份、至少1
个Worker
) - 采用自旋锁(
SpinLock
)安全读取共享内存中的Worker
完成状态 - 完整性检查防止数据丢失(确保所有
Worker
完成任务)
- 通过双断言确保执行环境正确(
- 磁带系统初始化
- 复用
Worker
生成的共享文件集合(shared->fileset
) - 创建逻辑磁带集合(
LogicalTapeSet
),建立虚拟磁带管理系统 - 初始化运行计数器为
Worker
数量,建立归并轮次基础
- 复用
- 数据结构重置
- 清空输入磁带列表,准备构建多路归并的输入源
- 分配输出磁带数组,容量等于
Worker
数量 - 设置输出参数反映待处理的并行运行段
- 磁带资源整合
- 循环遍历每个
Worker
,通过LogicalTapeImport
将其物理文件抽象为逻辑磁带 - 将
Worker
的输出磁带挂载到Leader
的磁带管理系统 - 最终状态切换至
TSS_BUILDRUNS
,标志初始归并段构建完成
函数源码如下:
- 循环遍历每个
/*
* Leader进程接管所有Worker生成的排序磁带,构建归并所需的磁带集合
* 使Leader进入与串行外部排序相同的状态,准备执行多路归并
*/
static void
leader_takeover_tapes(Tuplesortstate *state)
{
Sharedsort *shared = state->shared; // 获取共享内存控制结构
int nParticipants = state->nParticipants; // 参与的Worker总数
int workersFinished; // 实际完成的Worker计数器
int j; // 循环索引
/* 核心断言校验 */
Assert(LEADER(state)); // 确保当前是Leader进程
Assert(nParticipants >= 1); // 至少存在1个Worker参与
/* 安全获取Worker完成状态 */
SpinLockAcquire(&shared->mutex); // 获取自旋锁保护共享内存
workersFinished = shared->workersFinished; // 读取已完成的Worker数量
SpinLockRelease(&shared->mutex); // 释放自旋锁
/* 完整性检查:所有Worker必须完成排序 */
if (nParticipants != workersFinished)
elog(ERROR, "cannot take over tapes before all workers finish");
/*
* 初始化磁带系统:
* 1. 创建包含Worker磁带的逻辑磁带集合
* 2. 参数说明:
* - false: 表示不立即创建物理文件
* - &shared->fileset: 复用Worker的共享文件集合
* - -1: 临时文件使用默认编号
*/
inittapestate(state, nParticipants); // 初始化磁带管理状态
state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
/*
* 设置currentRun为Worker数量:
* 此值后续用于控制归并轮次,每个Worker对应一个初始运行段
*/
state->currentRun = nParticipants;
/*
* 重置输入输出状态,模拟串行排序的初始运行完成状态:
* - inputTapes: 归并输入磁带数组(初始为空)
* - nInputTapes/Runs: 输入磁带/运行数清零
* - outputTapes: 分配Worker数量大小的输出磁带数组
* - nOutputTapes/Runs: 设置为Worker数量,表示待处理运行段
*/
state->inputTapes = NULL;
state->nInputTapes = 0;
state->nInputRuns = 0;
state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *)); // 清零分配
state->nOutputTapes = nParticipants; // 输出磁带数=Worker数
state->nOutputRuns = nParticipants; // 每个Worker对应1个运行段
/* 循环导入每个Worker的磁带 */
for (j = 0; j < nParticipants; j++)
{
/*
* 将Worker的磁带导入Leader的磁带集合:
* - j: Worker的编号标识
* - &shared->tapes[j]: 对应Worker的磁带元数据
*/
state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
}
/* 更新状态机至构建运行完成阶段 */
state->status = TSS_BUILDRUNS;
}
mergeruns 函数
该函数是PostgreSQL
外部排序的核心归并引擎,负责将多个已排序的初始运行(磁带上的有序数据段)通过多轮平衡K路归并,最终生成完全有序的结果。主要功能包括:
1. 预处理阶段
- 禁用缩写键优化,确保使用完整比较器
- 重构内存管理系统,切换为
slab
分配器优化小对象分配 - 初始化
K
路归并堆结构,分配每个磁带的元组槽位
2. 多阶段归并
- 磁带轮转机制:将上一轮的输出磁带作为下一轮的输入
- 动态缓冲区分配:根据剩余内存计算最优
I/O
缓冲区大小 - 归并策略选择:自动判断是否进入最终单次归并阶段
- 平衡归并执行:每次从
N
个输入磁带选取最小元素写入新运行
3. 终止与收尾
- 结果磁带冻结:阻止后续修改,准备读取
- 资源清理:释放缓冲区内存,关闭磁带句柄
- 状态更新:标记排序完成状态
函数源码如下:
/*
* 执行多阶段归并排序,将多个有序运行段合并为最终有序结果
* 实现平衡K路归并算法,处理可能的多轮归并过程
*/
static void
mergeruns(Tuplesortstate *state)
{
int tapenum; // 磁带编号迭代变量
/* 校验当前状态:必须处于构建运行完成阶段且内存无剩余元组 */
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
/* 禁用缩写键优化:磁盘数据不存储缩写键,需使用完整比较器 */
if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
{
state->base.sortKeys->abbrev_converter = NULL; // 清空缩写转换器
state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator; // 切换完整比较器
/* 清理相关字段保持结构整洁 */
state->base.sortKeys->abbrev_abort = NULL;
state->base.sortKeys->abbrev_full_comparator = NULL;
}
/* 重置元组内存上下文:释放所有已分配元组,切换slab分配策略 */
MemoryContextResetOnly(state->base.tuplecontext);
/* 释放原始大内存数组,后续使用更紧凑的堆结构 */
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
pfree(state->memtuples);
state->memtuples = NULL;
/* 初始化slab分配器:每个输入磁带分配1槽 + 结果保留槽 */
if (state->base.tuples)
init_slab_allocator(state, state->nOutputTapes + 1); // 元组类型需要内存管理
else
init_slab_allocator(state, 0); // 值类型无需额外分配
/* 初始化归并堆:分配N路归并所需的元组槽 */
state->memtupsize = state->nOutputTapes; // 堆容量等于输入磁带数
state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
state->nOutputTapes * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples)); // 更新内存统计
/* 分配磁带缓冲区内存:使用全部剩余可用内存 */
state->tape_buffer_mem = state->availMem;
USEMEM(state, state->tape_buffer_mem);
if (trace_sort) // 调试日志:记录内存分配
elog(LOG, "worker %d using %zu KB of memory for tape buffers",
state->worker, state->tape_buffer_mem / 1024);
/* 多阶段归并主循环 */
for (;;)
{
/* 新归并轮次初始化:当输入运行耗尽时重新配置磁带 */
if (state->nInputRuns == 0)
{
int64 input_buffer_size;
/* 清理上一轮输入磁带 */
if (state->nInputTapes > 0)
{
for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
LogicalTapeClose(state->inputTapes[tapenum]); // 关闭磁带句柄
pfree(state->inputTapes); // 释放输入磁带数组
}
/* 轮转磁带角色:上轮输出变为本轮输入 */
state->inputTapes = state->outputTapes;
state->nInputTapes = state->nOutputTapes;
state->nInputRuns = state->nOutputRuns;
/* 初始化新输出磁带集合 */
state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
state->nOutputTapes = 0;
state->nOutputRuns = 0;
/* 计算输入缓冲区大小(平衡I/O与内存使用) */
input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
state->nInputTapes,
state->nInputRuns,
state->maxTapes);
/* 调试日志:记录归并轮次开始 */
if (trace_sort)
elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB for input: %s",
state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
pg_rusage_show(&state->ru_start));
/* 准备输入磁带:倒带并设置读取缓冲区 */
for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
/* 最终合并条件判断:单轮归并可结束 */
if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0 // 无需随机访问
&& state->nInputRuns <= state->nInputTapes // 运行数<=磁带数
&& !WORKER(state)) // 非工作进程
{
LogicalTapeSetForgetFreeSpace(state->tapeset); // 停止空间回收
beginmerge(state); // 初始化最终归并
state->status = TSS_FINALMERGE; // 更新状态
return; // 进入最终归并阶段
}
}
/* 选择输出磁带并执行单次归并 */
selectnewtape(state); // 分配新输出磁带
mergeonerun(state); // 合并一个完整运行
/* 终止条件:输入耗尽且输出单一运行 */
if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
break;
}
/* 归并完成处理 */
state->result_tape = state->outputTapes[0]; // 设置结果磁带
if (!WORKER(state))
LogicalTapeFreeze(state->result_tape, NULL); // 主进程冻结磁带
else
worker_freeze_result_tape(state); // 工作进程特殊处理
state->status = TSS_SORTEDONTAPE; // 更新为磁带已排序状态
/* 资源清理:关闭所有输入磁带释放缓冲区 */
for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
LogicalTapeClose(state->inputTapes[tapenum]);
}