SQL解析在下文中指的是将一条sql语句经过一系列的解析最后生成一个完整的物理执行计划的过程。这个过程包括以下四个步骤:词法分析、语法分析、生成逻辑计划、生成物理计划。Doris SQL解析具体包括了六个步骤:词法分析,语法分析、语义分析,生成单机逻辑计划,生成分布式逻辑计划,生成物理计划。具体代码实现上包含以下五个步骤:Parse、Analyze、SinglePlan、DistributedPlan、Schedule。
Parse
Parser阶段主要包含对新一代优化器NeridsParser解析器的调用(在COM_QUERY且Nereid优化器开启的情况下)NereidsParser().parseSQL(originStmt)
,其生成的Statement是LogicalPlanAdapter类类型;如果关闭Nereid优化器或fall behind,就会触发legacy优化器,也就是调用parse函数,原始解析流程。
Analyze、SinglePlan、DistributedPlan、Schedule
StmtExecutor类包含了Analyze、SinglePlan、DistributedPlan、Schedule功能。
Analyze主要是对Parse阶段生成的抽象语法树AST进行一些前期的处理和语义分析,为生成单机逻辑计划做准备。抽象语法树是由StatementBase这个抽象类表示。这个抽象类包含一个最重要的成员函数analyze(),用来执行Analyze阶段要做的事。不同类型的查询select, insert, show, set, alter table, create table等经过Parse阶段后生成不同的数据结构(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),这些数据结构继承自StatementBase,并实现analyze()函数,对特定类型的SQL进行特定的Analyze。例如:select类型的查询,会转成对select sql的子语句SelectList, FromClause, GroupByClause, HavingClause, WhereClause, SortInfo等的analyze()。然后这些子语句再各自对自己的子结构进行进一步的analyze(),通过层层迭代,把各种类型的sql的各种情景都分析完毕。例如:WhereClause进一步分析其包含的BetweenPredicate(between表达式), BinaryPredicate(二元表达式), CompoundPredicate(and or组合表达式), InPredicate(in表达式)等。
对于查询类型的SQL,包含以下几项重要工作:
· 元信息的识别和解析:识别和解析sql中涉及的 Cluster, Database, Table, Column 等元信息,确定需要对哪个集群的哪个数据库的哪些表的哪些列进行计算。
· SQL 的合法性检查:窗口函数不能 DISTINCT,投影列是否有歧义,where语句中不能含有grouping操作等。
· SQL 简单重写:比如将 select * 扩展成 select 所有列,count distinct转成bitmap或者hll函数等。
· 函数处理:检查sql中包含的函数和系统定义的函数是否一致,包括参数类型,参数个数等。
· Table 和 Column 的别名处理
· 类型检查和转换:例如二元表达式两边的类型不一致时,需要对其中一个类型进行转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。
对AST 进行analyze后,会再进行一次rewrite操作,进行精简或者是转成统一的处理方式。目前rewrite的算法是基于规则的方式,针对AST的树状结构,自底向上,应用每一条规则进行重写。如果重写后,AST有变化,则再次进行analyze和rewrite,直到AST无变化为止。例如:常量表达式的化简:1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。将一些语句转成统一的处理方式,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。
生成单机逻辑Plan阶段,这部分工作主要是根据AST抽象语法树生成代数关系,也就是俗称的算子数。树上的每个节点都是一个算子,代表着一种操作。具体来说这个阶段主要做了如下几项工作:
· Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化。
· 投影下推:BE 在 Scan 时只会 Scan 必须读取的列。
· 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点。
· 分区,分桶裁剪:根据过滤条件中的信息,确定需要扫描哪些分区,哪些桶的tablet。
· Join Reorder:对于 Inner Join, Doris 会根据行数调整表的顺序,将大表放在前面。
· Sort + Limit 优化成 TopN:对于order by limit语句会转换成TopN的操作节点,方便统一处理。
· MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的物化视图。
生成分布式Plan阶段,有了单机的PlanNode树之后,就需要进一步根据分布式环境,拆成分布式PlanFragment树(PlanFragment用来表示独立的执行单元),毕竟一个表的数据分散地存储在多台主机上,完全可以让一些计算并行起来。这个步骤的主要目标是最大化并行度和数据本地化。主要方法是将能够并行执行的节点拆分出去单独建立一个PlanFragment,用ExchangeNode代替被拆分出去的节点,用来接收数据。拆分出去的节点增加一个DataSinkNode,用来将计算之后的数据传送到ExchangeNode中,做进一步的处理。这一步采用递归的方法,自底向上,遍历整个PlanNode树,然后给树上的每个叶子节点创建一个PlanFragment,如果碰到父节点,则考虑将其中能够并行执行的子节点拆分出去,父节点和保留下来的子节点组成一个parent PlanFragment。拆分出去的子节点增加一个父节点DataSinkNode组成一个child PlanFragment,child PlanFragment指向parent PlanFragment。这样就确定了数据的流动方向。对于查询操作来说,join操作是最常见的一种操作。
Doris目前支持4种join算法:broadcast join,hash partition join,colocate join,bucket shuffle join。
· broadcast join:将小表发送到大表所在的每台机器,然后进行hash join操作。当一个表扫描出的数据量较少时,计算broadcast join的cost,通过计算比较hash partition的cost,来选择cost最小的方式。
· hash partition join:当两张表扫描出的数据都很大时,一般采用hash partition join。它遍历表中的所有数据,计算key的哈希值,然后对集群数取模,选到哪台机器,就将数据发送到这台机器进行hash join操作。
· colocate join:两个表在创建的时候就指定了数据分布保持一致,那么当两个表的join key与分桶的key一致时,就会采用colocate join算法。由于两个表的数据分布是一样的,那么hash join操作就相当于在本地,不涉及到数据的传输,极大提高查询性能。
· bucket shuffle join:当join key是分桶key,并且只涉及到一个分区时,就会优先采用bucket shuffle join算法。由于分桶本身就代表了数据的一种切分方式,所以可以利用这一特点,只需将右表对左表的分桶数hash取模,这样只需网络传输一份右表数据,极大减少了数据的网络传输,如图10所示。
Schedule阶段,这一步是根据分布式逻辑计划,创建分布式物理计划。主要解决以下问题:哪个 BE 执行哪个 PlanFragment?每个 Tablet 选择哪个副本去查询?如何进行多实例并发?
创建分布式物理计划的核心流程:
a. prepare阶段:给每个PlanFragment创建一个FragmentExecParams结构,用来表示PlanFragment执行时所需的所有参数;如果一个PlanFragment包含有DataSinkNode,则找到数据发送的目的PlanFragment,然后指定目的PlanFragment的FragmentExecParams的输入为该PlanFragment的FragmentExecParams。
b. computeScanRangeAssignment阶段:针对不同类型的join进行不同的处理。
· computeScanRangeAssignmentByColocate:针对colocate join进行处理,由于join的两个表桶中的数据分布都是一样的,他们是基于桶的join操作,所以在这里是确定每个桶选择哪个host。在给host分配桶时,尽量保证每个host分配到的桶基本平均。
· computeScanRangeAssignmentByBucket:针对bucket shuffle join进行处理,也只是基于桶的操作,所以在这里是确定每个桶选择哪个host。在给host分配桶时,同样需要尽量保证每个host分配到的桶基本平均。
· computeScanRangeAssignmentByScheduler:针对其他类型的join进行处理。确定每个scanNode读取tablet哪个副本。一个scanNode会读取多个tablet,每个tablet有多个副本。为了使scan操作尽可能分散到多台机器上执行,提高并发性能,减少IO压力,Doris采用了Round-Robin算法,使tablet的扫描尽可能地分散到多台机器上去。例如100个tablet需要扫描,每个tablet 3个副本,一共10台机器,在分配时,保障每台机器扫描10个tablet。
c. computeFragmentExecParams阶段:这个阶段解决PlanFragment下发到哪个BE上执行,以及如何处理实例并发问题。确定了每个tablet的扫描地址之后,就可以以地址为维度,将FragmentExecParams生成多个实例,也就是FragmentExecParams中包含的地址有多个,就生成多个实例FInstanceExecParam。如果设置了并发度,那么一个地址的执行实例再进一步的拆成多个FInstanceExecParam。针对bucket shuffle join和colocate join会有一些特殊处理,但是基本思想一样。FInstanceExecParam创建完成后,会分配一个唯一的ID,方便追踪信息。如果FragmentExecParams中包含有ExchangeNode,需要计算有多少senders,以便知道需要接受多少个发送方的数据。最后FragmentExecParams确定destinations,并把目的地址填充上去。
d. create result receiver阶段:result receiver是查询完成后,最终数据需要输出的地方。
e. to thrift阶段:根据所有PlanFragment的FInstanceExecParam创建rpc请求,然后下发到BE端执行。这样一个完整的SQL解析过程完成了。
https://blog.csdn.net/weixin_37850264/article/details/112761773