文章目录
- 背景
- 示例
- FlinkLogicalCalcConverter
- BatchPhysicalCalcRule
- StreamPhysicalCalcRule
- 其它算子
- FlinkLogicalAggregate
- FlinkLogicalCorrelate
- FlinkLogicalDataStreamTableScan
- FlinkLogicalDistribution
- FlinkLogicalExpand
- FlinkLogicalIntermediateTableScan
- FlinkLogicalIntersect
- FlinkLogicalJoin
- FlinkLogicalLegacySink
- FlinkLogicalLegacyTableSourceScan
- FlinkLogicalMatch
- FlinkLogicalMinus
- FlinkLogicalOverAggregate
- FlinkLogicalRank
- FlinkLogicalSink
- FlinkLogicalSnapshot
- FlinkLogicalSort
- FlinkLogicalUnion
- FlinkLogicalValues
背景
本文主要介绍calcite 如何转成自定义的relnode
示例
FlinkLogicalCalcConverter
检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc
private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
override def convert(rel: RelNode): RelNode = {
val calc = rel.asInstanceOf[LogicalCalc]
val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
FlinkLogicalCalc.create(newInput, calc.getProgram)
}
}
BatchPhysicalCalcRule
检查是不是FlinkLogicalCalc 的relnode
class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0)
val program = calc.getProgram
!program.getExprList.asScala.exists(containsPythonCall(_))
}
def convert(rel: RelNode): RelNode = {
val calc = rel.asInstanceOf[FlinkLogicalCalc]
val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)
new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)
}
}
StreamPhysicalCalcRule
检查是不是FlinkLogicalCalc 的relnode
class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0)
val program = calc.getProgram
!program.getExprList.asScala.exists(containsPythonCall(_))
}
def convert(rel: RelNode): RelNode = {
val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)
new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)
}
}
其它算子
介绍下算子的匹配条件
FlinkLogicalAggregate
对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true
override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalAggregate]
// we do not support these functions natively
// they have to be converted using the FlinkAggregateReduceFunctionsRule
val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
// we support AVG
case SqlKind.AVG => true
// but none of the other AVG agg functions
case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
case _ => true
}
val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)
!hasAccurateDistinctCall && supported
}
FlinkLogicalAggregateStreamConverter
SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换
override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalAggregate]
// we do not support these functions natively
// they have to be converted using the FlinkAggregateReduceFunctionsRule
agg.getAggCallList.map(_.getAggregation.getKind).forall {
case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
case _ => true
}
}
FlinkLogicalCorrelate
对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode
默认的onMatch 函数
FlinkLogicalDataStreamTableScan
对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
dataStreamTable != null
}
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
}
FlinkLogicalDistribution
描述数据是不是打散的
override def convert(rel: RelNode): RelNode = {
val distribution = rel.asInstanceOf[LogicalDistribution]
val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)
}
FlinkLogicalExpand
支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符
override def convert(rel: RelNode): RelNode = {
val expand = rel.asInstanceOf[LogicalExpand]
val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)
FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)
}
FlinkLogicalIntermediateTableScan
FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])
intermediateTable != null
}
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)
}
FlinkLogicalIntersect
用于表示 SQL 中 INTERSECT 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {
val intersect = rel.asInstanceOf[LogicalIntersect]
val newInputs = intersect.getInputs.map {
input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
}
FlinkLogicalIntersect.create(newInputs, intersect.all)
}
FlinkLogicalJoin
用于表示 SQL 中 JOIN 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {
val join = rel.asInstanceOf[LogicalJoin]
val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)
}
FlinkLogicalLegacySink
写数据到传统的数据源
override def convert(rel: RelNode): RelNode = {
val sink = rel.asInstanceOf[LogicalLegacySink]
val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
FlinkLogicalLegacySink.create(
newInput,
sink.hints,
sink.sink,
sink.sinkName,
sink.catalogTable,
sink.staticPartitions)
}
FlinkLogicalLegacyTableSourceScan
读传统的数据源
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
isTableSourceScan(scan)
}
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
}
FlinkLogicalMatch
MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。
override def convert(rel: RelNode): RelNode = {
val logicalMatch = rel.asInstanceOf[LogicalMatch]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)
new FlinkLogicalMatch(
rel.getCluster,
traitSet,
newInput,
logicalMatch.getRowType,
logicalMatch.getPattern,
logicalMatch.isStrictStart,
logicalMatch.isStrictEnd,
logicalMatch.getPatternDefinitions,
logicalMatch.getMeasures,
logicalMatch.getAfter,
logicalMatch.getSubsets,
logicalMatch.isAllRows,
logicalMatch.getPartitionKeys,
logicalMatch.getOrderKeys,
logicalMatch.getInterval)
}
FlinkLogicalMinus
用于表示 SQL 中 minus 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {
val minus = rel.asInstanceOf[LogicalMinus]
val newInputs = minus.getInputs.map {
input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
}
FlinkLogicalMinus.create(newInputs, minus.all)
}
FlinkLogicalOverAggregate
用于表示 SQL 中 窗口函数操作的逻辑运算符
FlinkLogicalRank
SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名
override def convert(rel: RelNode): RelNode = {
val rank = rel.asInstanceOf[LogicalRank]
val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
FlinkLogicalRank.create(
newInput,
rank.partitionKey,
rank.orderKey,
rank.rankType,
rank.rankRange,
rank.rankNumberType,
rank.outputRankNumber
)
}
FlinkLogicalSink
表示SQL里的写
FlinkLogicalSnapshot
SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照
def convert(rel: RelNode): RelNode = {
val snapshot = rel.asInstanceOf[LogicalSnapshot]
val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
snapshot.getPeriod match {
case _: RexFieldAccess =>
FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
case _: RexLiteral =>
newInput
}
}
FlinkLogicalSort
表示SQL里的排序
FlinkLogicalUnion
表示SQL里的union 操作
override def matches(call: RelOptRuleCall): Boolean = {
val union: LogicalUnion = call.rel(0)
union.all
}
override def convert(rel: RelNode): RelNode = {
val union = rel.asInstanceOf[LogicalUnion]
val newInputs = union.getInputs.map {
input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
}
FlinkLogicalUnion.create(newInputs, union.all)
}
FlinkLogicalValues
SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。