【PostgreSQL内核学习:深入理解 PostgreSQL 中的 tuplesort_performsort 函数】

news2025/4/5 10:01:28

深入理解 PostgreSQL 中的 tuplesort_performsort 函数

  • 函数概述
    • 函数源码
    • 函数签名
    • 核心功能
    • 相关函数简介
  • 代码结构与逻辑分析
    • 1. 内存上下文切换
    • 2. 调试跟踪(可选)
    • 3. 状态机逻辑(switch 分支)
    • 4. 调试跟踪(完成时)
    • 5. 恢复内存上下文
  • 关键特性与优化
    • 1. 状态机设计
    • 2. 状态转换图
    • 3. 混合排序策略
    • 4. 资源管理哲学
    • 5. 异常处理机制
  • 相关函数介绍
    • tuplesort_sort_memtuples 函数
      • 优化策略与特性
      • 应用场景
    • inittapes 函数
    • dumptuples 函数
    • leader_takeover_tapes 函数
    • mergeruns 函数

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书

  在 PostgreSQL 的元组排序(tuplesort)模块中,tuplesort_performsort 是一个核心函数,负责执行排序的主要逻辑。它根据当前的排序状态(state->status选择不同的排序策略,处理串行并行排序场景,并确保内存管理的高效性。本文将详细分析该函数的实现细节,探讨其作用、逻辑结构、状态转换以及相关优化策略,以帮助读者更好地理解 PostgreSQL 的排序机制。其中,tuplesort_performsort函数的源码流程可见【PostgreSQL内核学习 —— (sort算子)】。

函数概述

函数源码

/*
 * 执行排序的主入口函数,处理不同排序阶段的逻辑
 */
void
tuplesort_performsort(Tuplesortstate *state)
{
    // 切换到排序专用的内存上下文,确保内存分配在正确环境中进行
    MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);

#ifdef TRACE_SORT  // 调试跟踪模块
    if (trace_sort)
        elog(LOG, "performsort of worker %d starting: %s",
             state->worker, pg_rusage_show(&state->ru_start));  // 记录排序开始时的资源使用情况
#endif

    // 根据当前排序状态选择处理逻辑
    switch (state->status)
    {
        case TSS_INITIAL:  // 初始状态(未开始排序)
            if (SERIAL(state)) {  // 串行模式
                /* 单线程快速排序 */
                tuplesort_sort_memtuples(state);  // 对内存中的元组数组执行快速排序
                state->status = TSS_SORTEDINMEM;  // 更新状态为"内存已排序"
            } else if (WORKER(state)) {  // 并行工作线程模式
                /* 
                 * 并行Worker将数据写入磁带:
                 * 1. 初始化磁带存储(inittapes)
                 * 2. 将内存数据转储到磁带(dumptuples)
                 * 3. 标记不需要合并运行(worker_nomergeruns)
                 */
                inittapes(state, false);          // 初始化磁带文件
                dumptuples(state, true);          // 将内存元组写入磁带
                worker_nomergeruns(state);        // 设置Worker不参与合并
                state->status = TSS_SORTEDONTAPE;  // 更新状态为"磁带已排序"
            } else {  // 并行Leader模式
                /* 
                 * Leader接管Worker的磁带并执行多路归并:
                 * 1. 接管磁带资源(leader_takeover_tapes)
                 * 2. 执行多路归并排序(mergeruns)
                 */
                leader_takeover_tapes(state);     // 接管Worker的磁带资源
                mergeruns(state);                 // 执行多路归并排序
            }
            // 重置迭代器状态
            state->current = 0;                   // 重置当前读取位置
            state->eof_reached = false;           // 清除EOF标记
            state->markpos_block = 0L;            // 重置标记位置块号
            state->markpos_offset = 0;            // 重置标记位置偏移量
            state->markpos_eof = false;           // 清除标记EOF状态
            break;

        case TSS_BOUNDED:  // 有界堆排序状态
            /* 
             * 转换有界堆为有序数组:
             * 1. 调用sort_bounded_heap处理堆结构
             * 2. 更新状态为"内存已排序"
             */
            sort_bounded_heap(state);             // 将有界堆转换为有序数组
            state->current = 0;
            state->eof_reached = false;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            state->status = TSS_SORTEDINMEM;      // 更新状态
            break;

        case TSS_BUILDRUNS:  // 磁带构建运行状态
            /*
             * 完成磁带排序:
             * 1. 将内存剩余元组写入磁带(dumptuples)
             * 2. 执行多路归并(mergeruns)
             */
            dumptuples(state, true);              // 最终转储内存数据
            mergeruns(state);                     // 执行磁带合并
            state->eof_reached = false;
            state->markpos_block = 0L;
            state->markpos_offset = 0;
            state->markpos_eof = false;
            break;

        default:  // 无效状态处理
            elog(ERROR, "invalid tuplesort state");  // 抛出错误
            break;
    }

#ifdef TRACE_SORT  // 调试跟踪模块
    if (trace_sort) {
        if (state->status == TSS_FINALMERGE)  // 最终合并阶段日志
            elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
                 state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));
        else  // 常规完成日志
            elog(LOG, "performsort of worker %d done: %s",
                 state->worker, pg_rusage_show(&state->ru_start));
    }
#endif

    // 恢复原始内存上下文
    MemoryContextSwitchTo(oldcontext);
}

函数签名

void tuplesort_performsort(Tuplesortstate *state);
  • 参数Tuplesortstate *state 是一个指向排序状态结构的指针,包含了排序过程中的所有关键信息,如当前状态、内存数据、磁带文件等。
  • 作用:作为排序执行的入口点,tuplesort_performsort 根据状态机的设计,处理不同的排序阶段,包括内存排序、磁带写入和多路归并等。

核心功能

  • 根据 state->status 的值,决定是执行内存快速排序将数据写入磁带,还是进行多路归并
  • 支持串行模式并行模式(包括 WorkerLeader 角色)。
  • 管理内存上下文,确保资源分配和释放的正确性。
  • 重置迭代器状态,为后续读取操作做好准备。

相关函数简介

  • tuplesort_sort_memtuples:内存中的快速排序实现。
  • inittapes:初始化磁带文件结构。
  • dumptuples:将内存元组写入磁带。
  • worker_nomergeruns:标记 Worker 无需参与合并。
  • leader_takeover_tapesLeader 接管 Worker 的磁带资源。
  • mergeruns:执行多路归并排序。
  • sort_bounded_heap:将有界堆转换为有序数组。

这些函数共同构成了 tuplesort 模块的功能链。

代码结构与逻辑分析

  以下是 tuplesort_performsort 的完整代码分解与分析:

1. 内存上下文切换

MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
  • 目的:将当前内存上下文切换到排序专用的 state->sortcontext,确保排序过程中分配的内存都在这个上下文中,便于统一管理和释放。
  • 重要性PostgreSQL 使用内存上下文来隔离不同操作的内存分配,防止内存泄漏或冲突。

在函数结束时,会通过 MemoryContextSwitchTo(oldcontext) 恢复原始上下文。

2. 调试跟踪(可选)

#ifdef TRACE_SORT
if (trace_sort)
    elog(LOG, "performsort of worker %d starting: %s",
         state->worker, pg_rusage_show(&state->ru_start));
#endif
  • 作用:如果启用了 TRACE_SORT 宏,记录排序开始时的资源使用情况(如 CPU 和内存)。
  • 工具pg_rusage_show 用于显示资源使用统计,便于性能分析和调试。

类似的调试日志也会在排序完成后记录,区分是否处于最终合并阶段。

3. 状态机逻辑(switch 分支)

  tuplesort_performsort 的核心逻辑基于 state->status 的值,通过 switch-case 结构分支执行:
TSS_INITIAL:初始状态

case TSS_INITIAL:
    if (SERIAL(state)) {  // 串行模式
        tuplesort_sort_memtuples(state);  // 内存快速排序
        state->status = TSS_SORTEDINMEM;  // 更新状态
    } else if (WORKER(state)) {  // 并行 Worker 模式
        inittapes(state, false);          // 初始化磁带
        dumptuples(state, true);          // 写入磁带
        worker_nomergeruns(state);        // 标记无需合并
        state->status = TSS_SORTEDONTAPE; // 更新状态
    } else {  // 并行 Leader 模式
        leader_takeover_tapes(state);     // 接管磁带
        mergeruns(state);                 // 多路归并
    }
    // 重置迭代器状态
    state->current = 0;
    state->eof_reached = false;
    state->markpos_block = 0L;
    state->markpos_offset = 0;
    state->markpos_eof = false;
    break;

串行模式 (SERIAL(state)):

调用 tuplesort_sort_memtuples 对内存中的元组数组执行快速排序。
状态更新为 TSS_SORTEDINMEM,表示内存已排序完成。

并行 Worker 模式 (WORKER(state)):

初始化磁带文件(inittapes)。
将内存中的元组写入磁带(dumptuples)。
调用 worker_nomergeruns 标记 Worker 不参与后续合并。
状态更新为 TSS_SORTEDONTAPE,表示数据已排序并存储在磁带上。

并行 Leader 模式:

调用 leader_takeover_tapes 接管 Worker 的磁带资源。
执行 mergeruns 进行多路归并排序。

迭代器重置:无论哪种模式,都会重置读取相关的状态变量(如 currenteof_reached)。


TSS_BOUNDED:有界堆排序状态

case TSS_BOUNDED:
    sort_bounded_heap(state);             // 转换堆为有序数组
    state->current = 0;
    state->eof_reached = false;
    state->markpos_offset = 0;
    state->markpos_eof = false;
    state->status = TSS_SORTEDINMEM;      // 更新状态
    break;
  • 逻辑

    • 调用 sort_bounded_heap 将有界堆转换为有序数组
    • 更新状态为 TSS_SORTEDINMEM
  • 适用场景:用于需要限制内存使用的排序(如 Top-N 查询)。


TSS_BUILDRUNS:构建运行状态

case TSS_BUILDRUNS:
    dumptuples(state, true);              // 写入剩余元组
    mergeruns(state);                     // 多路归并
    state->eof_reached = false;
    state->markpos_block = 0L;
    state->markpos_offset = 0;
    state->markpos_eof = false;
    break;
  • 逻辑

    • 将内存中剩余的元组写入磁带(dumptuples)。
    • 执行多路归并(mergeruns)完成排序。
  • 适用场景:处理大数据集时,内存不足以容纳所有元组,需分阶段写入磁带。


默认分支:无效状态

default:
    elog(ERROR, "invalid tuplesort state");
    break;
  • 作用:如果 state->status 不匹配任何已知状态,抛出错误,确保程序健壮性。

4. 调试跟踪(完成时)

#ifdef TRACE_SORT
if (trace_sort) {
    if (state->status == TSS_FINALMERGE)
        elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
             state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));
    else
        elog(LOG, "performsort of worker %d done: %s",
             state->worker, pg_rusage_show(&state->ru_start));
}
#endif
  • 作用:记录排序完成时的资源使用情况,特别区分是否处于最终合并阶段(TSS_FINALMERGE)。

5. 恢复内存上下文

MemoryContextSwitchTo(oldcontext);
  • 目的:恢复调用前的内存上下文,确保不影响外部环境。

关键特性与优化

1. 状态机设计

Tuplesortstate 中的 status 字段定义了排序的不同阶段:

  • TSS_INITIAL:初始状态,未开始排序。
  • TSS_BOUNDED:有界堆排序状态,用于内存受限场景。
  • TSS_BUILDRUNS:构建运行状态,处理大数据集。
  • TSS_SORTEDINMEM:内存已排序完成。
  • TSS_SORTEDONTAPE:磁带已排序完成。
  • TSS_FINALMERGE:最终合并阶段(调试日志中提及)。

这种状态机设计使得函数逻辑清晰,能够灵活应对不同场景。

2. 状态转换图

  以下是 tuplesort_performsort 中状态转换的简化流程:

[TSS_INITIAL]
  ├─SERIAL → 快速排序 → [TSS_SORTEDINMEM]
  ├─WORKER → 磁带写入 → [TSS_SORTEDONTAPE] 
  └─LEADER → 多路归并 → [TSS_FINALMERGE]

TSS_BOUNDED → TSS_SORTEDINMEM

TSS_BUILDRUNS → (完成合并后,可能进入 TSS_SORTEDONTAPE 或 TSS_FINALMERGE)

3. 混合排序策略

  根据数据规模动态选择算法:

数据量 < work_mem → 快速排序
work_mem < 数据量 < 10*work_mem → 堆排序
数据量 > 10*work_mem → 多阶段归并

4. 资源管理哲学

  • 内存优先:最大限度利用可用内存
  • 渐进式落盘:按需写入临时文件
  • 并行流水线:计算与I/O重叠

5. 异常处理机制

  • 内存超限:触发"soft"错误可恢复
  • 磁盘空间不足:硬错误终止
  • 并行协调:通过共享内存检测Worker状态

相关函数介绍

tuplesort_sort_memtuples 函数

  本函数是PostgreSQL内存排序的核心实现,根据不同的数据类型特征选择最优化的快速排序策略。主要执行流程为:
  tuplesort_sort_memtuplesPostgreSQL tuplesort 机制中的核心函数之一负责对 memtuples 数组中的元组执行快速排序 (quicksort)。首先,它会检查 LEADER(state) 以确保当前不是 Leader 线程,然后判断 memtupcount 是否大于 1,如果只有一个或没有元组,则无需排序,直接返回。
  接下来,它会根据数据特征选择最优的排序策略。如果 haveDatum1 标志为 true,且 sortKeys 存在,表示可以使用专门优化的比较器。针对不同的 comparator,例如 ssup_datum_unsigned_cmpssup_datum_signed_cmp(仅适用于 64 位系统)或 ssup_datum_int32_cmp,分别调用 qsort_tuple_unsignedqsort_tuple_signedqsort_tuple_int32 进行排序,以提升性能。
  如果不能使用这些优化路径,则会检查 onlyKey 是否非空,若为 true,说明只涉及单列排序,调用 qsort_ssup 进行排序;否则,调用 qsort_tuple,使用 comparetup 作为比较函数,执行多列排序。这样,tuplesort_sort_memtuples 在不同数据类型和排序需求下,都能高效地选择最优排序方式,提高整体查询性能。
  函数源码如下:

/* 
 * 对内存中的元组数组进行快速排序
 * 根据排序键类型选择最优化的排序实现
 */
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{
    /* 断言确认当前不是并行排序的Leader进程 */
    Assert(!LEADER(state));

    /* 仅当内存元组数量>1时才需要排序 */
    if (state->memtupcount > 1)
    {
        /*
         * 检查是否存在可优化的排序条件:
         * 1. 首列数据存储在datum1字段中
         * 2. 存在有效的排序键定义
         */
        if (state->base.haveDatum1 && state->base.sortKeys)
        {
            /* 针对无符号整型比较器的优化路径 */
            if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
            {
                /* 调用无符号整型特化快速排序 */
                qsort_tuple_unsigned(state->memtuples,
                                     state->memtupcount,
                                     state);
                return; // 优化路径直接返回
            }
#if SIZEOF_DATUM >= 8  // 64位系统专属优化
            /* 针对有符号整型比较器的优化路径 */
            else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
            {
                /* 调用有符号整型特化快速排序 */
                qsort_tuple_signed(state->memtuples,
                                   state->memtupcount,
                                   state);
                return;
            }
#endif
            /* 针对32位整型比较器的优化路径 */
            else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
            {
                /* 调用32位整型特化快速排序 */
                qsort_tuple_int32(state->memtuples,
                                  state->memtupcount,
                                  state);
                return;
            }
        }

        /* 单排序键的通用处理路径 */
        if (state->base.onlyKey != NULL)
        {
            /* 使用标准单键快速排序实现 */
            qsort_ssup(state->memtuples, state->memtupcount,
                       state->base.onlyKey);
        }
        /* 多键/复杂排序场景 */
        else
        {
            /* 使用元组通用快速排序实现 */
            qsort_tuple(state->memtuples,
                        state->memtupcount,
                        state->base.comparetup,  // 自定义元组比较函数
                        state);                  // 传递状态上下文
        }
    }
}

优化策略与特性

  • 数据类型特化: 通过识别排序键的数据类型(如无符号整数有符号整数32 位整数),选择专用 qsort 变体,提升性能。
  • 单键优化: 单键场景下使用 qsort_ssup,避免多键比较的复杂性。
  • 条件编译: #if SIZEOF_DATUM >= 8 确保有符号整数优化仅在 64 位系统启用,增强兼容性。
  • 惰性执行: 仅在元组数量大于 1 时排序,减少无用操作。

应用场景

  • 内存中小数据集: 快速完成排序。
  • 外部排序: 在外部排序的运行中,对内存中的元组进行预排序后写入磁带。
  • 并行排序: Worker 线程在提交结果前对本地元组排序。

inittapes 函数

  inittapes 函数是 PostgreSQL 元组排序模块的核心函数之一,专门用于在内存不足以完成排序时初始化外部磁带排序的环境
  它首先检查 mergeruns 参数,判断是否需要合并多个排序运行runs)。如果需要,则根据可用内存调用 tuplesort_merge_order 计算最优的输入磁带数量,以优化归并性能;如果不需要(通常发生在 Worker 线程中),则将磁带数量设为最小值,以简化处理流程。
  接着,函数会记录切换到外部排序的日志(若启用了调试跟踪),并调用 inittapestateLogicalTapeSetCreate 来创建和初始化逻辑磁带集。该逻辑磁带集可以支持与共享文件集关联,使得并行排序任务能够协调进行。
  随后,inittapes 还会初始化运行编号、输入和输出磁带数组,以及相关的计数器等状态变量,确保磁带资源管理井然有序。
  最后,函数将排序状态更新为 TSS_BUILDRUNS,并选择一个新的磁带,以准备数据写入。通过合理分配磁带资源并设置必要的状态inittapes 为后续的大规模外部排序奠定了坚实基础,确保 PostgreSQL 在处理超出内存容量的大规模数据排序时依然能够高效运行。
  函数源码如下:

/*
 * inittapes - initialize for tape sorting.
 * 初始化磁带排序的函数。
 *
 * This is called only if we have found we won't sort in memory.
 * 仅在确定无法在内存中完成排序时调用此函数。
 */
static void
inittapes(Tuplesortstate *state, bool mergeruns)  // 定义静态函数 inittapes,参数包括排序状态 state 和是否需要合并运行的布尔值 mergeruns
{
	Assert(!LEADER(state));  // 断言当前线程不是 Leader 线程,因为 Leader 线程通常不直接处理磁带初始化,而是协调其他线程

	if (mergeruns)  // 如果 mergeruns 为真,表示需要进行多运行的合并排序
	{
		/* Compute number of input tapes to use when merging */
		/* 计算在合并时需要使用的输入磁带数量 */
		state->maxTapes = tuplesort_merge_order(state->allowedMem);  // 调用 tuplesort_merge_order 函数,根据允许的内存大小计算合适的输入磁带数量
	}
	else  // 如果 mergeruns 为假,表示无需合并,通常发生在 Worker 线程生成单个运行的情况
	{
		/* Workers can sometimes produce single run, output without merge */
		/* Worker 线程有时可以生成单个运行并直接输出,无需合并 */
		Assert(WORKER(state));  // 断言当前线程是 Worker 线程,确保逻辑正确
		state->maxTapes = MINORDER;  // 将磁带数量设置为最小值 MINORDER(通常为 1),因为只需处理单个运行
	}

	if (trace_sort)  // 如果启用了排序跟踪功能(trace_sort 为真)
		elog(LOG, "worker %d switching to external sort with %d tapes: %s",  // 调用 elog 记录日志
			 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));  // 日志内容包括 Worker ID、使用磁带数量及资源使用情况

	/* Create the tape set */
	/* 创建磁带集 */
	inittapestate(state, state->maxTapes);  // 调用 inittapestate 函数初始化磁带状态,准备管理磁带文件和资源
	state->tapeset =  // 为状态中的 tapeset 赋值,创建逻辑磁带集
		LogicalTapeSetCreate(false,  // 调用 LogicalTapeSetCreate 创建逻辑磁带集,第一个参数为 false 表示不使用随机访问
							 state->shared ? &state->shared->fileset : NULL,  // 如果 state->shared 存在,则使用共享文件集,否则为 NULL
							 state->worker);  // 传递 Worker ID,用于标识该磁带集的所有者

	state->currentRun = 0;  // 初始化当前运行编号为 0,表示从第一个运行开始

	/*
	 * Initialize logical tape arrays.
	 * 初始化逻辑磁带数组。
	 */
	state->inputTapes = NULL;  // 将输入磁带数组初始化为空,表示当前没有输入磁带
	state->nInputTapes = 0;    // 初始化输入磁带数量为 0
	state->nInputRuns = 0;     // 初始化输入运行数量为 0

	state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));  // 使用 palloc0 分配输出磁带数组内存,大小为 maxTapes 个指针,并初始化为 0
	state->nOutputTapes = 0;   // 初始化输出磁带数量为 0
	state->nOutputRuns = 0;    // 初始化输出运行数量为 0

	state->status = TSS_BUILDRUNS;  // 更新排序状态为 TSS_BUILDRUNS,表示当前处于构建运行的阶段

	selectnewtape(state);  // 调用 selectnewtape 函数选择一个新的磁带,用于后续元组的写入操作
}

dumptuples 函数

  该函数是PostgreSQL外部排序的核心例程,负责将内存中的排序结果持久化到磁带文件(实际为临时文件),形成可用于多路归并的初始有序段。主要实现以下关键功能:

​1. 内存管控逻辑

  • 动态决策转储时机:通过LACKMEM判断内存压力,仅在内存不足或强制模式时触发转储
  • 空转储预防机制:避免生成无效空运行段(Worker进程例外)
  • 运行次数安全墙:防止INT_MAX溢出导致逻辑错误

​2. 磁带管理

  • 多磁带轮转策略:通过selectnewtape实现Round-robin磁带选择
  • 运行段边界标记markrunend写入特殊标识区分不同数据段
  • 并行Worker支持:维护独立的运行计数器及磁带分配

​3. 排序与持久化

  • 内存排序优化:调用特化的tuplesort_sort_memtuples进行快速排序
  • 批量写入优化:采用顺序写模式最大化I/O效率
  • 元组内存管理:重置内存上下文避免碎片,精确统计内存使用

​4. 资源监控与调试

  • 资源使用跟踪:通过pg_rusage_show记录CPU/内存/I/O数据
  • 多级日志输出:记录排序开始、结束及磁带写入的关键节点
  • 运行编号追踪:维护currentRun实现多阶段运行管理

  该过程在外部排序中循环执行,直至所有数据处理完成,形成多个有序的磁带运行段,为后续的多阶段归并排序奠定基础。
  函数源码如下:

/*
 * 将内存中的元组转储到磁带,形成初始归并段
 * 
 * @param state 排序状态机
 * @param alltuples 是否强制转储所有内存元组
 *                  (true表示输入数据已结束,必须全部转储)
 */
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
    int         memtupwrite;  // 待写入的元组总数
    int         i;            // 循环索引

    /*
     * 内存未耗尽且非强制转储时提前返回:
     * 1. 内存元组未达容量上限
     * 2. 仍有可用内存空间
     * 3. 非最终强制转储模式
     */
    if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
        !alltuples)
        return;

    /*
     * 处理空转储的特殊情况:
     * 1. 内存中无待处理元组
     * 2. 当前运行轮次>0(避免创建全空初始运行)
     * 例外:Worker进程必须生成至少一个磁带
     */
    if (state->memtupcount == 0 && state->currentRun > 0)
        return;

    // 验证当前处于构建运行阶段
    Assert(state->status == TSS_BUILDRUNS);

    /*
     * 运行次数安全校验:
     * 防止currentRun溢出INT_MAX导致不可预测行为
     * 实际场景几乎不可能触发,但作为防御性编程措施
     */
    if (state->currentRun == INT_MAX)
        ereport(ERROR,  // 抛出致命错误
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("cannot have more than %d runs for an external sort",
                        INT_MAX)));

    /* 新运行轮次需要切换输出磁带 */
    if (state->currentRun > 0)
        selectnewtape(state);  // 选择新的磁带进行写入

    state->currentRun++;  // 递增运行计数器

    /* 调试日志:记录排序开始信息 */
    if (trace_sort)
        elog(LOG, "worker %d starting quicksort of run %d: %s",
             state->worker, state->currentRun,
             pg_rusage_show(&state->ru_start));  // 显示资源使用情况

    /*
     * 核心排序操作:
     * 对当前内存中的元组执行快速排序
     * 使用特化排序函数提升性能
     */
    tuplesort_sort_memtuples(state);

    /* 调试日志:记录排序完成信息 */
    if (trace_sort)
        elog(LOG, "worker %d finished quicksort of run %d: %s",
             state->worker, state->currentRun,
             pg_rusage_show(&state->ru_start));

    /* 准备写入操作 */
    memtupwrite = state->memtupcount;  // 获取待写入元组总数
    for (i = 0; i < memtupwrite; i++)  // 遍历排序后的元组
    {
        SortTuple  *stup = &state->memtuples[i];  // 获取当前元组指针

        WRITETUP(state, state->destTape, stup);  // 将元组写入目标磁带
    }

    state->memtupcount = 0;  // 重置内存元组计数器

    /*
     * 重置元组内存上下文:
     * 1. 释放所有已分配的元组内存
     * 2. 防止内存碎片化,特别是元组大小差异较大时
     * 3. 对于有界排序,避免AllocSetFree按大小分类导致的碎片
     */
    MemoryContextReset(state->base.tuplecontext);

    /*
     * 更新内存统计:
     * 1. 释放元组占用的内存配额
     * 2. 重置内存使用计数器
     */
    FREEMEM(state, state->tupleMem);
    state->tupleMem = 0;

    /* 标记当前磁带运行结束 */
    markrunend(state->destTape);

    /* 调试日志:记录磁带写入完成 */
    if (trace_sort)
        elog(LOG, "worker %d finished writing run %d to tape %d: %s",
             state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
             pg_rusage_show(&state->ru_start));
}

leader_takeover_tapes 函数

  本函数是PostgreSQL并行外部排序的核心协调函数,主要实现Leader进程对Worker排序结果的统一接管和归并准备。其核心工作流程可分为四个阶段

  1. 状态安全校验
    • 通过双断言确保执行环境正确(Leader身份、至少1Worker
    • 采用自旋锁(SpinLock)安全读取共享内存中的Worker完成状态
    • 完整性检查防止数据丢失(确保所有Worker完成任务)
  2. ​磁带系统初始化
    • 复用Worker生成的共享文件集合(shared->fileset
    • 创建逻辑磁带集合(LogicalTapeSet),建立虚拟磁带管理系统
    • 初始化运行计数器为Worker数量,建立归并轮次基础
  3. ​数据结构重置
    • 清空输入磁带列表,准备构建多路归并的输入源
    • 分配输出磁带数组,容量等于Worker数量
    • 设置输出参数反映待处理的并行运行段
  4. 磁带资源整合
    • 循环遍历每个Worker,通过LogicalTapeImport将其物理文件抽象为逻辑磁带
    • Worker的输出磁带挂载到Leader的磁带管理系统
    • 最终状态切换至TSS_BUILDRUNS,标志初始归并段构建完成
        函数源码如下:
/*
 * Leader进程接管所有Worker生成的排序磁带,构建归并所需的磁带集合
 * 使Leader进入与串行外部排序相同的状态,准备执行多路归并
 */
static void
leader_takeover_tapes(Tuplesortstate *state)
{
    Sharedsort *shared = state->shared;         // 获取共享内存控制结构
    int         nParticipants = state->nParticipants; // 参与的Worker总数
    int         workersFinished;               // 实际完成的Worker计数器
    int         j;                             // 循环索引

    /* 核心断言校验 */
    Assert(LEADER(state));                     // 确保当前是Leader进程
    Assert(nParticipants >= 1);                // 至少存在1个Worker参与

    /* 安全获取Worker完成状态 */
    SpinLockAcquire(&shared->mutex);           // 获取自旋锁保护共享内存
    workersFinished = shared->workersFinished; // 读取已完成的Worker数量
    SpinLockRelease(&shared->mutex);           // 释放自旋锁

    /* 完整性检查:所有Worker必须完成排序 */
    if (nParticipants != workersFinished)
        elog(ERROR, "cannot take over tapes before all workers finish");

    /*
     * 初始化磁带系统:
     * 1. 创建包含Worker磁带的逻辑磁带集合
     * 2. 参数说明:
     *    - false: 表示不立即创建物理文件
     *    - &shared->fileset: 复用Worker的共享文件集合
     *    - -1: 临时文件使用默认编号
     */
    inittapestate(state, nParticipants);       // 初始化磁带管理状态
    state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);

    /* 
     * 设置currentRun为Worker数量:
     * 此值后续用于控制归并轮次,每个Worker对应一个初始运行段
     */
    state->currentRun = nParticipants;

    /* 
     * 重置输入输出状态,模拟串行排序的初始运行完成状态:
     * - inputTapes: 归并输入磁带数组(初始为空)
     * - nInputTapes/Runs: 输入磁带/运行数清零
     * - outputTapes: 分配Worker数量大小的输出磁带数组
     * - nOutputTapes/Runs: 设置为Worker数量,表示待处理运行段
     */
    state->inputTapes = NULL;
    state->nInputTapes = 0;
    state->nInputRuns = 0;

    state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *)); // 清零分配
    state->nOutputTapes = nParticipants;       // 输出磁带数=Worker数
    state->nOutputRuns = nParticipants;        // 每个Worker对应1个运行段

    /* 循环导入每个Worker的磁带 */
    for (j = 0; j < nParticipants; j++)
    {
        /* 
         * 将Worker的磁带导入Leader的磁带集合:
         * - j: Worker的编号标识
         * - &shared->tapes[j]: 对应Worker的磁带元数据
         */
        state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    }

    /* 更新状态机至构建运行完成阶段 */
    state->status = TSS_BUILDRUNS;
}

mergeruns 函数

  该函数是PostgreSQL外部排序的核心归并引擎,负责将多个已排序的初始运行(磁带上的有序数据段)通过多轮平衡K路归并,最终生成完全有序的结果。主要功能包括:

​1. 预处理阶段

  • 禁用缩写键优化,确保使用完整比较器
  • 重构内存管理系统,切换为slab分配器优化小对象分配
  • 初始化K路归并堆结构,分配每个磁带的元组槽位

​2. 多阶段归并

  • 磁带轮转机制:将上一轮的输出磁带作为下一轮的输入
  • 动态缓冲区分配:根据剩余内存计算最优I/O缓冲区大小
  • 归并策略选择:自动判断是否进入最终单次归并阶段
  • 平衡归并执行:每次从N个输入磁带选取最小元素写入新运行

​3. 终止与收尾

  • 结果磁带冻结:阻止后续修改,准备读取
  • 资源清理:释放缓冲区内存,关闭磁带句柄
  • 状态更新:标记排序完成状态

  函数源码如下:

/*
 * 执行多阶段归并排序,将多个有序运行段合并为最终有序结果
 * 实现平衡K路归并算法,处理可能的多轮归并过程
 */
static void
mergeruns(Tuplesortstate *state)
{
    int         tapenum;  // 磁带编号迭代变量

    /* 校验当前状态:必须处于构建运行完成阶段且内存无剩余元组 */
    Assert(state->status == TSS_BUILDRUNS);
    Assert(state->memtupcount == 0);

    /* 禁用缩写键优化:磁盘数据不存储缩写键,需使用完整比较器 */
    if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    {
        state->base.sortKeys->abbrev_converter = NULL;  // 清空缩写转换器
        state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator; // 切换完整比较器
        
        /* 清理相关字段保持结构整洁 */
        state->base.sortKeys->abbrev_abort = NULL;
        state->base.sortKeys->abbrev_full_comparator = NULL;
    }

    /* 重置元组内存上下文:释放所有已分配元组,切换slab分配策略 */
    MemoryContextResetOnly(state->base.tuplecontext);

    /* 释放原始大内存数组,后续使用更紧凑的堆结构 */
    FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    pfree(state->memtuples);
    state->memtuples = NULL;

    /* 初始化slab分配器:每个输入磁带分配1槽 + 结果保留槽 */
    if (state->base.tuples)
        init_slab_allocator(state, state->nOutputTapes + 1); // 元组类型需要内存管理
    else
        init_slab_allocator(state, 0); // 值类型无需额外分配

    /* 初始化归并堆:分配N路归并所需的元组槽 */
    state->memtupsize = state->nOutputTapes; // 堆容量等于输入磁带数
    state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
                                                        state->nOutputTapes * sizeof(SortTuple));
    USEMEM(state, GetMemoryChunkSpace(state->memtuples)); // 更新内存统计

    /* 分配磁带缓冲区内存:使用全部剩余可用内存 */
    state->tape_buffer_mem = state->availMem; 
    USEMEM(state, state->tape_buffer_mem);
    if (trace_sort)  // 调试日志:记录内存分配
        elog(LOG, "worker %d using %zu KB of memory for tape buffers",
             state->worker, state->tape_buffer_mem / 1024);

    /* 多阶段归并主循环 */
    for (;;)
    {
        /* 新归并轮次初始化:当输入运行耗尽时重新配置磁带 */
        if (state->nInputRuns == 0)
        {
            int64       input_buffer_size;

            /* 清理上一轮输入磁带 */
            if (state->nInputTapes > 0)
            {
                for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
                    LogicalTapeClose(state->inputTapes[tapenum]); // 关闭磁带句柄
                pfree(state->inputTapes); // 释放输入磁带数组
            }

            /* 轮转磁带角色:上轮输出变为本轮输入 */
            state->inputTapes = state->outputTapes;
            state->nInputTapes = state->nOutputTapes;
            state->nInputRuns = state->nOutputRuns;

            /* 初始化新输出磁带集合 */
            state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
            state->nOutputTapes = 0;
            state->nOutputRuns = 0;

            /* 计算输入缓冲区大小(平衡I/O与内存使用) */
            input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
                                                       state->nInputTapes,
                                                       state->nInputRuns,
                                                       state->maxTapes);

            /* 调试日志:记录归并轮次开始 */
            if (trace_sort)
                elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB for input: %s",
                     state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
                     pg_rusage_show(&state->ru_start));

            /* 准备输入磁带:倒带并设置读取缓冲区 */
            for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
                LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);

            /* 最终合并条件判断:单轮归并可结束 */
            if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0  // 无需随机访问
                && state->nInputRuns <= state->nInputTapes  // 运行数<=磁带数
                && !WORKER(state))  // 非工作进程
            {
                LogicalTapeSetForgetFreeSpace(state->tapeset); // 停止空间回收
                beginmerge(state);         // 初始化最终归并
                state->status = TSS_FINALMERGE; // 更新状态
                return;  // 进入最终归并阶段
            }
        }

        /* 选择输出磁带并执行单次归并 */
        selectnewtape(state);   // 分配新输出磁带
        mergeonerun(state);     // 合并一个完整运行

        /* 终止条件:输入耗尽且输出单一运行 */
        if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
            break;
    }

    /* 归并完成处理 */
    state->result_tape = state->outputTapes[0];  // 设置结果磁带
    if (!WORKER(state))
        LogicalTapeFreeze(state->result_tape, NULL);  // 主进程冻结磁带
    else
        worker_freeze_result_tape(state);  // 工作进程特殊处理
    state->status = TSS_SORTEDONTAPE;  // 更新为磁带已排序状态

    /* 资源清理:关闭所有输入磁带释放缓冲区 */
    for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
        LogicalTapeClose(state->inputTapes[tapenum]);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2328514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(多看) CExercise_05_1函数_1.2计算base的exponent次幂

题目&#xff1a; 键盘录入两个整数&#xff1a;底(base)和幂指数(exponent)&#xff0c;计算base的exponent次幂&#xff0c;并打印输出对应的结果。&#xff08;注意底和幂指数都可能是负数&#xff09; 提示&#xff1a;求幂运算时&#xff0c;基础的思路就是先无脑把指数转…

Vuue2 element-admin管理后台,Crud.js封装表格参数修改

需求 表格数据调用列表接口&#xff0c;需要多传一个 Type字段&#xff0c;而Type字段的值 需要从跳转页面Url上面获取到&#xff0c;并赋值给Type&#xff0c;再传入列表接口中&#xff0c;最后拿到表格数据并展示 遇到的问题 需求很简单&#xff0c;但是因为表格使用的是统…

Tiktok矩阵运营中使用云手机的好处

Tiktok矩阵运营中使用云手机的好处 云手机在TikTok矩阵运营中能够大幅提高管理效率、降低封号风险&#xff0c;并节省成本&#xff0c;是非常实用的运营工具。TikTok矩阵运营使用云手机有很多优势&#xff0c;特别是对于需要批量管理账号、提高运营效率的团队来说。以下是几个…

Linux下调试器gdb_cgdb使用

文章目录 一、样例代码二、使用watchset var确定问题原因条件断点 一、样例代码 #include <stdio.h>int Sum(int s, int e) {int result 0;int i;for(i s; i < e; i){result i;}return result; }int main() {int start 1;int end 100;printf("I will begin…

Vite环境下解决跨域问题

在 Vite 开发环境中&#xff0c;可以通过配置代理来解决跨域问题。以下是具体步骤&#xff1a; 在项目根目录下找到 vite.config.js 文件&#xff1a;如果没有&#xff0c;则需要创建一个。配置代理&#xff1a;在 vite.config.js 文件中&#xff0c;使用 server.proxy 选项来…

超简单:Linux下opencv-gpu配置

1.下载opencv和opencv_contrib安装包 1&#xff09;使用命令下 git clone https://github.com/opencv/opencv.git -b 4.9.0 git clone https://github.com/opencv/opencv_contrib.git -b 4.9.02&#xff09;复制链接去GitHub下载然后上传到服务器 注意&#xff1a;看好版本&a…

泰博云平台solr接口存在SSRF漏洞

免责声明&#xff1a;本号提供的网络安全信息仅供参考&#xff0c;不构成专业建议。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权&#xff0c;请及时与我联系&#xff0c;我将尽快处理并删除相关内容。 漏洞描述 SSRF漏洞是一种在未能获取服务器…

31天Python入门——第20天:魔法方法详解

你好&#xff0c;我是安然无虞。 文章目录 魔法方法1. __new__和__del__2. __repr__和__len__3. __enter__和__exit__4. 可迭代对象和迭代器5. 中括号[]数据操作6. __getattr__、__setattr__ 和 __delattr__7. 可调用的8. 运算符 魔法方法 魔法方法: Python中的魔法方法是一类…

ubantu22.04中搭建地图开发环境(qt5.15.2 + osg3.7.0 + osgearth3.7.1 + osgqt)

一、下载安装qt5.15.2 二、下载编译安装osg3.7.0 三、下载编译安装osgearth3.7.1 四、下载编译安装osgqt 五、二三维地图显示demo开发 六、成果展示&#xff1a; 已有功能&#xff1a;加载了dom影像、可以进行二三维地图切换显示、二维地图支持缩放和平移、三维地图支持旋转…

Bethune X 6发布:为小规模数据库环境打造轻量化智能监控巡检利器

3月31日&#xff0c;“奇点时刻・数智跃迁 -- 云和恩墨2025春季产品发布会”通过视频号直播的方式在线上举办。发布会上&#xff0c;云和恩墨副总经理熊军正式发布 zCloud 6.7和zData X 3.3两款产品新版本&#xff0c;同时也带来了 Bethune X 6——这款面向小规模数据库环境的智…

一文理解什么是中值模糊

目录 中值模糊的概念 中值模糊&#xff08;Median Blur&#xff09; 中值模糊的原理 示例&#xff1a;33 中值模糊 什么是椒盐噪声 椒盐噪声&#xff08;Salt-and-Pepper Noise&#xff09; 椒盐噪声的特点 OpenCV 中的 cv2.medianBlur() 函数格式 示例代码 中值模糊…

游戏引擎学习第192天

仓库:https://gitee.com/mrxiao_com/2d_game_4 回顾 我们现在正在编写一些界面代码&#xff0c;主要是用户界面&#xff08;UI&#xff09;&#xff0c;不过这里的UI并不是游戏内的用户界面&#xff0c;而是为开发者设计的用户界面。我们正在尝试做一些小的UI元素&#xff0c…

通信数据记录仪-产品概念ID

总结: 1、支持高速CAN、支持容错CAN、支持单线CAN(理解是支持不同的协议,CANFD、CAN2.0和LIN?) 2、 通过上位机设计时间

Mac VM 卸载 win10 安装win7系统

卸载 找到相应直接删除&#xff08;移动到废纸篓&#xff09; 可参考&#xff1a;mac如何卸载虚拟机win 下载 win7下载地址

基于图扑 HT 技术的电缆厂 3D 可视化管控系统深度解析

在当今数字化浪潮席卷制造业的大背景下&#xff0c;图扑软件&#xff08;Hightopo&#xff09;凭借其自主研发的强大技术&#xff0c;为电缆厂打造了一套先进的 3D 可视化管控系统。该系统基于 HT for Web 技术&#xff0c;为电缆厂的数字化转型提供了有力支撑。 HT 技术核心架…

《AI大模型开发笔记》MCP快速入门实战(一)

目录 1. MCP入门介绍 2. Function calling技术回顾 3. 大模型Agent开发技术体系回顾 二、 MCP客户端Client开发流程 1. uv工具入门使用指南 1.1 uv入门介绍 1.2 uv安装流程 1.3 uv的基本用法介绍 2.MCP极简客户端搭建流程 2.1 创建 MCP 客户端项目 2.2 创建MCP客户端…

常见的ETL工具分类整理

一、开源ETL工具 ‌Kettle&#xff08;Pentaho Data Integration&#xff09;--Spoon‌ 设计及架构&#xff1a;面向数据仓库建模的传统ETL工具。使用方式&#xff1a;C/S客户端模式&#xff0c;开发和生产环境需要独立部署&#xff0c;任务编写、调试、修改都在本地。底层架构…

Smart Link 技术全面解析

1.1 网络冗余技术的演进与需求 1.2 Smart Link 的核心价值与本文目标 第一章 Smart Link 技术概述 2.1 Smart Link 的应用场景与背景 2.2 Smart Link 的基本概念与组网角色 2.3 Smart Link 与传统技术的对比 第二章 Smart Link 工作原理 3.1 Smart Link 组的构成与运行机…

Roo Code(前身为 Roo Cline)一个 AI 驱动的自主编码代理

Roo Code&#xff08;前身为 Roo Cline&#xff09; Roo Code 是一个 AI 驱动的自主编码代理&#xff0c;它存在于您的编辑器中。它可以&#xff1a; 用自然语言沟通直接在您的工作区读写文件运行终端命令自动化浏览器操作与任何 OpenAI 兼容或自定义的 API/模型集成通过自定…

数字化三维实训室:无穿戴动作捕捉技术如何赋能体育与舞蹈

在高校体育与舞蹈教学中&#xff0c;精准的动作训练至关重要。传统训练方式依赖教练的肉眼观察与手动记录&#xff0c;存在效率低下、误差较大的情况。尤其在快速连续动作或复杂肢体形态的捕捉中&#xff0c;人工判读易受主观经验限制&#xff0c;难以实现标准化评估。面对传统…