一、前言
本文主要介绍内容一条查询语句如何实现由逻辑计划生成物理计划。查询语句要执行的操作、涉及的表信息等存放于逻辑计划的 PlanNode 中,物理计划的构建就是通过解析逻辑计划的 PlanNode,将对应的 PlanNode 转换为对应算子(Processor),算子之间再通过 Stream 连接。
为了方便大家理解本文的内容,先简单介绍下面三个概念:
- Physical Plan:物理计划,将逻辑查询计划的每一个操作符选择实现算法并选择这些操作符的执行顺序得到的计划。
- Processor:物理计划得到的结果,存放了语句所需要执行的算子的相关信息,需要执行什么算子,分布式中算子需要发送到哪个节点中执行等。
- Stream:存放在 Processor 的信息,标注了 Processor 中的算子的执行顺序以及执行节点的信息。
二、物理计划的构建
我们将通过以下 SQL 语句来介绍物理计划的生成:
select max(height),class from heights join students on heights.id=students.id group by class having class in(1,2) order by max(height) desc limit 2;
该查询语句的 PlanNode 如下图所示:
以上述 PlanNode 为例,其最下层为两个 scanNode,分别是对 heights 和 students 的一个全表扫描,其结果会返回给上层的 joinNode,joinNode 会将两张表作 join 生成一张虚拟表,里面有两张表中的所有列。
其上层的 renderNode 会对这张虚拟表进行查询,筛选出 height 列和 class 列,groupNode 会对 class 列进行 group 处理,并对 class 作 max 聚合。然后, sortNode 对 groupNode 处理过的 max 聚合结果进行排序,limitNode 对结果作相应的操作。最后,最上层的 renderNode 对结果进行查询,筛选出 max(height) 列和 class 列。以上即为该 PlanNode 的详细信息。
接着会通过 createPlanForNode 函数对 PlanNode 进行解析生成物理计划。该函数是一个递归函数,会通过 PlanNode 的类型来构建相应的物理计划。
以上述查询语句为例,该 PlanNode 会层层递归先执行 scanNode 的构建函数 createTableReaders;接着,通过 initTableReaderSpec 新建 tablereader 的 spec;随后,通过逻辑计划传下来的 plannode 得到算子的 filter 和 limit;然后,通过 MakeExpression() 构造物理计划的 filter 并将 filter 和 limit 传入 post 中。
最后,通过 planCtx 的 isLocal 判断是否是分布式读取计划。
- 若是,则构建 SpanPartition 数组,将到各个节点读取 table 的值;
- 若不是,则单读取本地数据即可。
具体流程如下图所示:
在构建完 left scanNode 和 right scanNode 的计划后得到 rightPlan 和 leftPlan, leftPlan 和 rightPlan 执行 MergePlans() 合并左右计划,将左右计划的 processor 和 stream 等信息合并。
之后判断是否为分布式执行的步骤与上面的判断方法类似。最后再判断 leftMergeOrd.Columns 是否等于 nil。
- 若是,则构建 hashjoinspec;
- 若不是,则构建 mergejoinspec。
执行 AddjoinStage() 将 joinProcessor 添加到指定的节点上去并将左右 output 连接到这些 Processor 中,joinNode 也就处理完毕了,基本流程如下图。依次处理 renderNode, groupNode, sortNode 等,将相应算子信息添加到物理计划中。