详解flink sql, calcite logical转flink logical

news2025/1/10 19:03:48

文章目录

    • 背景
    • 示例
    • FlinkLogicalCalcConverter
    • BatchPhysicalCalcRule
    • StreamPhysicalCalcRule
    • 其它算子
      • FlinkLogicalAggregate
      • FlinkLogicalCorrelate
      • FlinkLogicalDataStreamTableScan
      • FlinkLogicalDistribution
      • FlinkLogicalExpand
      • FlinkLogicalIntermediateTableScan
      • FlinkLogicalIntersect
      • FlinkLogicalJoin
      • FlinkLogicalLegacySink
      • FlinkLogicalLegacyTableSourceScan
      • FlinkLogicalMatch
      • FlinkLogicalMinus
      • FlinkLogicalOverAggregate
      • FlinkLogicalRank
      • FlinkLogicalSink
      • FlinkLogicalSnapshot
      • FlinkLogicalSort
      • FlinkLogicalUnion
      • FlinkLogicalValues

背景

本文主要介绍calcite 如何转成自定义的relnode

在这里插入图片描述

示例

FlinkLogicalCalcConverter

检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {

  override def convert(rel: RelNode): RelNode = {
    val calc = rel.asInstanceOf[LogicalCalc]
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalCalc.create(newInput, calc.getProgram)
  }
}

BatchPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {

  override def matches(call: RelOptRuleCall): Boolean = {
    val calc: FlinkLogicalCalc = call.rel(0)
    val program = calc.getProgram
    !program.getExprList.asScala.exists(containsPythonCall(_))
  }

  def convert(rel: RelNode): RelNode = {
    val calc = rel.asInstanceOf[FlinkLogicalCalc]
    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)

    new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)
  }
}

StreamPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {

  override def matches(call: RelOptRuleCall): Boolean = {
    val calc: FlinkLogicalCalc = call.rel(0)
    val program = calc.getProgram
    !program.getExprList.asScala.exists(containsPythonCall(_))
  }

  def convert(rel: RelNode): RelNode = {
    val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)

    new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)
  }
}

其它算子

介绍下算子的匹配条件

FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true

override def matches(call: RelOptRuleCall): Boolean = {
    val agg = call.rel(0).asInstanceOf[LogicalAggregate]

    // we do not support these functions natively
    // they have to be converted using the FlinkAggregateReduceFunctionsRule
    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
      // we support AVG
      case SqlKind.AVG => true
      // but none of the other AVG agg functions
      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
      case _ => true
    }

    val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)

    !hasAccurateDistinctCall && supported
  }

FlinkLogicalAggregateStreamConverter

SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换

override def matches(call: RelOptRuleCall): Boolean = {
    val agg = call.rel(0).asInstanceOf[LogicalAggregate]

    // we do not support these functions natively
    // they have to be converted using the FlinkAggregateReduceFunctionsRule
    agg.getAggCallList.map(_.getAggregation.getKind).forall {
      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
      case _ => true
    }
  }

FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode

默认的onMatch 函数

FlinkLogicalDataStreamTableScan

对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode

  override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
    dataStreamTable != null
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
  }

FlinkLogicalDistribution

描述数据是不是打散的

  override def convert(rel: RelNode): RelNode = {
    val distribution = rel.asInstanceOf[LogicalDistribution]
    val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)
  }

FlinkLogicalExpand

支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符

 override def convert(rel: RelNode): RelNode = {
    val expand = rel.asInstanceOf[LogicalExpand]
    val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)
  }

FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作

override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])
    intermediateTable != null
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)
  }

FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 操作的逻辑运算符

override def convert(rel: RelNode): RelNode = {
    val intersect = rel.asInstanceOf[LogicalIntersect]
    val newInputs = intersect.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalIntersect.create(newInputs, intersect.all)
  }

FlinkLogicalJoin

用于表示 SQL 中 JOIN 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
    FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)
  }

FlinkLogicalLegacySink

写数据到传统的数据源

override def convert(rel: RelNode): RelNode = {
    val sink = rel.asInstanceOf[LogicalLegacySink]
    val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalLegacySink.create(
      newInput,
      sink.hints,
      sink.sink,
      sink.sinkName,
      sink.catalogTable,
      sink.staticPartitions)
  }

FlinkLogicalLegacyTableSourceScan

读传统的数据源

override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    isTableSourceScan(scan)
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
    FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
  }

FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。

override def convert(rel: RelNode): RelNode = {
    val logicalMatch = rel.asInstanceOf[LogicalMatch]
    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
    val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)

    new FlinkLogicalMatch(
      rel.getCluster,
      traitSet,
      newInput,
      logicalMatch.getRowType,
      logicalMatch.getPattern,
      logicalMatch.isStrictStart,
      logicalMatch.isStrictEnd,
      logicalMatch.getPatternDefinitions,
      logicalMatch.getMeasures,
      logicalMatch.getAfter,
      logicalMatch.getSubsets,
      logicalMatch.isAllRows,
      logicalMatch.getPartitionKeys,
      logicalMatch.getOrderKeys,
      logicalMatch.getInterval)
  }

FlinkLogicalMinus

用于表示 SQL 中 minus 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {
    val minus = rel.asInstanceOf[LogicalMinus]
    val newInputs = minus.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalMinus.create(newInputs, minus.all)
  }

FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数操作的逻辑运算符

FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名

override def convert(rel: RelNode): RelNode = {
    val rank = rel.asInstanceOf[LogicalRank]
    val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalRank.create(
      newInput,
      rank.partitionKey,
      rank.orderKey,
      rank.rankType,
      rank.rankRange,
      rank.rankNumberType,
      rank.outputRankNumber
    )
  }

FlinkLogicalSink

表示SQL里的写

FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照

def convert(rel: RelNode): RelNode = {
    val snapshot = rel.asInstanceOf[LogicalSnapshot]
    val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
    snapshot.getPeriod match {
      case _: RexFieldAccess =>
        FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
      case _: RexLiteral =>
        newInput
    }
  }

FlinkLogicalSort

表示SQL里的排序

FlinkLogicalUnion

表示SQL里的union 操作

 override def matches(call: RelOptRuleCall): Boolean = {
    val union: LogicalUnion = call.rel(0)
    union.all
  }

  override def convert(rel: RelNode): RelNode = {
    val union = rel.asInstanceOf[LogicalUnion]
    val newInputs = union.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalUnion.create(newInputs, union.all)
  }

FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1879914.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

原来“山水博客“的分类也是可以拖动排序的

这二天一直用“山水博客”写文章,发现一个问题,好象它的分类不能调整位置,这可是个大bug。首先,界面上没发现拖动相关按钮;如果按住分类拖动,会成这样: 后来仔细看了它的文档,发现它…

弹性力学讲义

弹性力学讲义 1. 基本假设和一些概念2. 应力3. 二维应力状态与摩尔库伦屈服准则 1. 基本假设和一些概念 力学:变形体力学–固体力学和流体力学(连续介质力学) 刚体力学–理论力学(一般力学) 物理受理后:要…

Forecasting from LiDAR via Future Object Detection

Forecasting from LiDAR via Future Object Detection 基础信息 论文:cvpr2022paper https://openaccess.thecvf.com/content/CVPR2022/papers/Peri_Forecasting_From_LiDAR_via_Future_Object_Detection_CVPR_2022_paper.pdfgithub:https://github.co…

聚焦 HW 行动,构筑重保邮件安全防线

随着信息技术的飞速发展,网络安全已成为国家安全的重要组成部分。HW行动作为国家级网络安全演练,通过模拟实战攻防,检验和提升国家关键信息基础设施的防护能力。 CACTER凭借多年HW防护经验,提供全面的邮件安全防护体系&#xff0…

RPC远程过程调用--Thrift

RPC远程过程调用–Thrift 简介 Thrift是一个由Facebook开发的轻量级、跨语言的远程服务调用框架,后进入Apache开源项目。支持通过自身接口定义语言IDL定义RPC接口和数据类型,然后通过编译器生成不同语言代码,用于构建抽象易用、可互操作的R…

从头开始构建一个小规模的文生视频模型

OpenAI 的 Sora、Stability AI 的 Stable Video Diffusion 以及许多其他已经发布或未来将出现的文本生成视频模型,是继大语言模型 (LLM) 之后 2024 年最流行的 AI 趋势之一。 在这篇博客中,作者将展示如何将从头开始构建一个小规模的文本生成视频模型&a…

Web后端开发之前后端交互

http协议 http ● 超文本传输协议 (HyperText Transfer Protocol)服务器传输超文本到本地浏览器的传送协议 是互联网上应用最为流行的一种网络协议,用于定义客户端浏览器和服务器之间交换数据的过程。 HTTP是一个基于TCP/IP通信协议来传递数据. HTT…

成绩发布背后:老师的无奈与痛点

在教育的广阔天地里,教师这一角色承载着无数的期望与责任。他们不仅是知识的传播者,更是学生心灵的引路人。而对于班主任老师来说,他们的角色更加多元,他们不仅是老师,还必须是“妈妈”。除了像其他老师一样备课、上课…

Web3 前端攻击:原因、影响及经验教训

DeFi的崛起引领了一个创新和金融自由的新时代。然而,这种快速增长也吸引了恶意行为者的注意,他们试图利用漏洞进行攻击。尽管很多焦点都集中在智能合约安全上,但前端攻击也正在成为一个重要的威胁向量。 前端攻击的剖析 理解攻击者利用前端漏…

MaxKb/open-webui+Ollama运行模型

准备:虚拟机:centos7 安装Docker:首先,需要安装Docker,因为Ollama和MaxKB都是基于Docker的容器。使用以下命令安装Docker: sudo yum install -y yum-utils device-mapper-persistent-data lvm2 sudo yum…

Keil汇编相关知识

一、汇编的组成 1.汇编指令:在内存中占用内存,执行一条汇编指令会让处理器进行相关运算 分类:数据处理指令,跳转指令,内存读写指令,状态寄存器传送指令,软中断产生指令,协助处理器…

生成式AI如何赋能教育?商汤发布《2024生成式AI赋能教育未来》白皮书

生成式AI正在各个行业中展现出巨大的应用前景。在关系国计民生的教育行业,生成式AI能够催生哪些创新模式? 6月28日,商汤科技受邀参加2024中国AIGC应用与发展峰会,并在会上发布《2024生成式AI赋能教育未来》白皮书,提出…

Django之阿里云短信

短信验证 短信验证,首先得选择一个短信发送服务器上,本文档使用阿里云实现短信发送功能 阿里云短信网 网址:短信服务_企业短信营销推广_验证码通知-阿里云 注册账号 新账号赠送100条,可以不用充值,即可进行测试 接入 短信 进行 个人实名认证 编写代码执行 安装依赖模块 p…

html5 video去除边框

video的属性: autoplay 视频在就绪后自动播放。 controls 显示控件,比如播放按钮。 height 设置视频播放器的高度。 width 设置视频播放器的宽度。 loop 循环播放 muted 视频的音频输出静音。 poster 视频加载时显示的图像,或者在用户点击播…

全球最大智能立体书库|北京:3万货位,715万册,自动出库、分拣、搬运

导语 大家好,我是社长,老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 北京城市图书馆的立体书库采用了先进的WMS(仓库管理系统)和WCS(仓库控制系统),与图书…

【机器学习】机器学习的重要技术——生成对抗网络:理论、算法与实践

引言 生成对抗网络(Generative Adversarial Networks, GANs)由Ian Goodfellow等人在2014年提出,通过生成器和判别器两个神经网络的对抗训练,成功实现了高质量数据的生成。GANs在图像生成、数据增强、风格迁移等领域取得了显著成果…

老师期末工作怎么减负?

期末,一个学期的尾声,也是老师们最为忙碌的时刻。在这段时间里,我们不仅要完成教学任务,还要准备期末考试、批改试卷、撰写学生评语、制定假期计划等一系列繁重的工作。那么,如何在这样紧张的期末工作中为自己减负呢&a…

Android高级面试_2_IPC相关

Android 高级面试-3:语言相关 1、Java 相关 1.1 缓存相关 问题:LruCache 的原理? 问题:DiskLruCache 的原理? LruCache 用来实现基于内存的缓存,LRU 就是最近最少使用的意思,LruCache 基于L…

RocketMQ源码学习笔记:Producer启动流程

这是本人学习的总结,主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、Overview1.1、创建MQClientInstance1.1.1、检查1.1.1、MQClientInstance的ID 1.2、MQClientInstance.start() 1、Overview 这是发送信息的代码样例, DefaultMQProducer produ…

百强韧劲,进击新局 2023年度中国医药工业百强系列榜单发布

2024年,经济工作坚持稳中求进、以进促稳、先立后破等工作要求。医药健康行业以不懈进取的“韧劲”,立身破局,迎变启新。通过创新和迭代应对不确定性,进化韧性力量,坚持高质量发展,把握新时代经济和社会给予…