一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

news2025/2/23 14:25:25

文章目录

      • 你可以学到啥
      • 测试代码
      • 背景知识
      • SQL转变流程图
      • 问题

你可以学到啥

  • SQL如何一步步变成执行计划的
  • 有哪些优化器,哪些优化规则
  • calcite 和flink 如何结合的

测试代码

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();
        Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();
        tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema)
                .option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());

        tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1)
                .option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());

        String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);
        System.out.println(cost);

背景知识

需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识

SQL转变流程图

SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。
在这里插入图片描述

问题

  • 问题1,FlinkBatchProgram
    所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {
  val SUBQUERY_REWRITE = "subquery_rewrite"
  val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"
  val DECORRELATE = "decorrelate"
  val DEFAULT_REWRITE = "default_rewrite"
  val PREDICATE_PUSHDOWN = "predicate_pushdown"
  val JOIN_REORDER = "join_reorder"
  val JOIN_REWRITE = "join_rewrite"
  val PROJECT_REWRITE = "project_rewrite"
  val WINDOW = "window"
  val LOGICAL = "logical"
  val LOGICAL_REWRITE = "logical_rewrite"
  val TIME_INDICATOR = "time_indicator"
  val PHYSICAL = "physical"
  val PHYSICAL_REWRITE = "physical_rewrite"
  val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"
  val RUNTIME_FILTER = "runtime_filter
  }
  • 问题2,calcite 优化器和flink 如何结合的
    logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
    剩下的优化器HepPlanner,HepPlanner 完全使用规则。

  • 问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
    因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的

  • 问题4,物理计划如何生成执行代码的?
    BatchPhysicalTableSourceScan 类

class BatchPhysicalTableSourceScan(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    hints: util.List[RelHint],
    tableSourceTable: TableSourceTable)
  extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)
  with BatchPhysicalRel {

  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
    val rowCnt = mq.getRowCount(this)
    if (rowCnt == null) {
      return null
    }
    val cpu = 0
    val rowSize = mq.getAverageRowSize(this)
    val size = rowCnt * rowSize
    planner.getCostFactory.makeCost(rowCnt, cpu, size)
  }

  // 这里生成的执行代码
  override def translateToExecNode(): ExecNode[_] = {
    val tableSourceSpec = new DynamicTableSourceSpec(
      tableSourceTable.contextResolvedTable,
      util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
    tableSourceSpec.setTableSource(tableSourceTable.tableSource)

    new BatchExecTableSourceScan(
      unwrapTableConfig(this),
      tableSourceSpec,
      FlinkTypeFactory.toLogicalRowType(getRowType),
      getRelDetailedDescription)
  }
}
  • 问题5,为啥aa_user 表被广播,哪里实现的?

BatchPhysicalHashJoinRule 规则实现的

核心代码

 val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)
      val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)

      // if it is not with hint, just check size of left and right side by statistic and config
      // if leftSize or rightSize is unknown, cannot use broadcast
      if (leftSize == null || rightSize == null) {
        return (false, false)
      }

      val threshold =
        tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)

      val rightSizeSmallerThanThreshold = rightSize <= threshold
      val leftSizeSmallerThanThreshold = leftSize <= threshold
      val leftSmallerThanRight = leftSize < rightSize

      join.getJoinType match {
        case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)
        case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)
        case JoinRelType.FULL => (false, false)
        case JoinRelType.INNER =>
          (
            leftSizeSmallerThanThreshold
              || rightSizeSmallerThanThreshold,
            leftSmallerThanRight)
        // left side cannot be used as build side in SEMI/ANTI join.
        case JoinRelType.SEMI | JoinRelType.ANTI =>
          (rightSizeSmallerThanThreshold, false)
      }

主要就是实现

  def binaryRowRelNodeSize(relNode: RelNode): JDouble = {
    val mq = relNode.getCluster.getMetadataQuery
    val rowCount = mq.getRowCount(relNode)
    if (rowCount == null) {
      null
    } else {
      rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
    }
  }

最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类

class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {

  override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF

  /**
   * Gets the null count of the given column in TableScan.
   *
   * @param ts
   *   TableScan RelNode
   * @param mq
   *   RelMetadataQuery instance
   * @param index
   *   the index of the given column
   * @return
   *   the null count of the given column in TableScan
   */
  def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {
    Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])
    val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]
    val fieldNames = relOptTable.getRowType.getFieldNames
    Preconditions.checkArgument(index >= 0 && index < fieldNames.size())
    val fieldName = fieldNames.get(index)
    val statistic = relOptTable.getStatistic
    val colStats = statistic.getColumnStats(fieldName)
    if (colStats != null && colStats.getNullCount != null) {
      colStats.getNullCount.toDouble
    } else {
      null
    }
  }
  }

ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数

 private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {
        final RelOptTable scanTable = scan.getTable();
        if (!(scanTable instanceof TableSourceTable)) {
            return scan;
        }

        FlinkContext context = ShortcutUtils.unwrapContext(scan);
        TableSourceTable table = (TableSourceTable) scanTable;
        boolean reportStatEnabled =
                context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)
                        && table.tableSource() instanceof SupportsStatisticReport;

        SourceAbilitySpec[] specs = table.abilitySpecs();
        PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);

        FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);
        TableStats newTableStat =
                recomputeStatistics(
                        table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);
        FlinkStatistic newStatistic =
                FlinkStatistic.builder()
                        .statistic(table.getStatistic())
                        .tableStats(newTableStat)
                        .build();
        TableSourceTable newTable = table.copy(newStatistic);
        return new LogicalTableScan(
                scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);
    }
  • 问题 6 优化后的relnode如何生成执行计划的
override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    beforeTranslation()
    if (modifyOperations.isEmpty) {
      return List.empty[Transformation[_]]
    }

    val relNodes = modifyOperations.map(translateToRel)
    val optimizedRelNodes = optimize(relNodes)
    //就是这里
    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
    val transformations = translateToPlan(execGraph)
    afterTranslation()
    transformations
  }

所有物理计划的relnode 都extend于FlinkPhysicalRel

 public ExecNodeGraph generate(List<FlinkPhysicalRel> relNodes, boolean isCompiled) {
        List<ExecNode<?>> rootNodes = new ArrayList<>(relNodes.size());
        for (FlinkPhysicalRel relNode : relNodes) {
            rootNodes.add(generate(relNode, isCompiled));
        }
        return new ExecNodeGraph(rootNodes);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2146428.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Shein西班牙开放平台模式广受市场欢迎,Shein适合卖什么产品?

Shein是一家专注于女性快时尚的跨境B2C互联网企业&#xff0c;主要面向欧美、中东等消费市场。经过海外十余年的发展&#xff0c;Shein在全球积累了数量庞大且仍在高速增长的广泛用户基础。目前Shein已开放自营商家及平台卖家两大合作模式&#xff0c;通过入驻Shein平台&#x…

23ai DGPDB,Oracle资源池战略的最后一块拼图

Oracle对资源池是有执念的&#xff01; 在我看来&#xff0c;这种执念一方面是应用架构的微服务化&#xff0c;数据库被拆分的越来越小&#xff0c;而服务器的硬件能力是不断提升的&#xff0c;CPU核心数、内存和存储的容量都按照摩尔定律在不断增加&#xff0c;这就使得数据库…

QTAndroid编译环境配置

开始 QT 官网的安装教程安装&#xff0c;经过测试有部分小错误。以下是结合教程和网上搜集的一些材料最后安装成功的步骤。 SDK和JDKhttp://链接: https://pan.baidu.com/s/13CImHLAoUFAdecF2BVsBlQ?pwd627g 提取码: 627ghttp://链接: https://pan.baidu.com/s/13CImHLAoUFAd…

Git 代码撤销、回滚到任意版本(当误提代码到本地或master分支时)

两种情况&#xff08;场景&#xff09; 情况一 代码还只在本地&#xff0c;未push到运程仓库&#xff0c;想把代码还原到上一次commit的代码&#xff0c;此时操作为代码撤销 解决方案&#xff1a; git reset [--hard|soft|mixed|merge|keep] [commit|HEAD] 情况二 …

Axure设计之表格列冻结(动态面板+中继器)

在Web端产品设计中&#xff0c;复杂的表格展示是常见需求&#xff0c;尤其当表格包含大量列时&#xff0c;如何在有限的屏幕空间内优雅地展示所有信息成为了一个挑战。用户通常需要滚动查看隐藏列&#xff0c;但关键信息列&#xff08;如ID、操作按钮等&#xff09;在滚动时保持…

十三 系统架构设计(考点篇)

1 软件架构的概念 一个程序和计算系统软件体系结构是指系统的一个或者多个结构。结构中包括软件的构件&#xff0c;构件 的外部可见属性以及它们之间的相互关系。 体系结构并非可运行软件。确切地说&#xff0c;它是一种表达&#xff0c;使软件工程师能够&#xff1a; (1)分…

VMware vSphere 8.0 Update 3b 发布下载,新增功能概览

VMware vSphere 8.0 Update 3b 发布下载&#xff0c;新增功能概览 vSphere 8.0U3 | ESXi 8.0U3 & vCenter Server 8.0U3 请访问原文链接&#xff1a;https://sysin.org/blog/vmware-vsphere-8-u3/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页…

汽车软件开发之敏捷开发

一、前言 目前汽车电子产品&#xff0c;特别是汽车几大域控&#xff08;如&#xff1a;智能座舱、智能驾驶、智能网联、车身控制&#xff09;市场竞争激烈&#xff0c;消费者对汽车的需求逐渐多元化和个性化&#xff0c;用户对座舱和智驾产品的要求也越来越高。他们不仅要求产…

人工智能时代,我们依旧有无限的选择权!

人工智能时代&#xff0c;即有人两眼放光&#xff0c;又有人忧心忡忡。前者看到大量的机遇、蓝海&#xff0c;后者看到了失业和糟糕的未来&#xff0c;亦或是有人有喜有忧。但是只要你知晓一个真谛&#xff1a;凡事皆有利有弊&#xff0c;那便不用内耗了。或是选择当前的生活节…

SAP B1 - 新账套设置密码不过期

背景 建立新账套后&#xff0c;每隔一段时间就会自动弹出以下要求更改密码的提示窗口&#xff0c;最讨厌的是系统会记住你的所有历史密码&#xff0c;新设置密码不能与所有曾用密码相同。找到终止该自动更换密码的设置&#xff0c;遂总结为经验帖。 操作 点击位于顶栏的小人图…

示例:WPF中Grid显示网格线的几种方式

一、目的&#xff1a;介绍一下WPF中Grid显示网格线的几种方式 二、几种方式 1、重写OnRender绘制网格线&#xff08;推荐&#xff09; 效果如下&#xff1a; 实现方式如下&#xff1a; public class LineGrid : Grid{private readonly Pen _pen;public LineGrid(){_pen new P…

C#实战|大乐透选号器[11]:确认选号功能的实现

哈喽,你好啊,我是雷工! 接着练习大乐透选号器的功能,这里练习实现确认选号功能; 以下为实现笔记; 01 效果演示 实现点击确认选号,将机选或手动选出的号码,添加到列表中显示; 02 设置DataGridView 首先设置控件GataGridView的属性; 2.1、编辑列 选中控件,点击右上角的…

最新Kali Linux超详细安装教程(附镜像包)

一、镜像下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1BfiyAMW6E1u9fhfyv8oH5Q 提取码&#xff1a;tft5 二、配置虚拟机 这里我们以最新的vm17.5为例。进行配置 1.创建新的虚拟机&#xff1a;选择自定义 2.下一步 3.选择稍后安装操作系统 4.选择Debian版本 因…

AI绘画实操 Stable Diffusion 到底怎么玩儿,新手必看的AI绘画入门安装使用教程

大家好&#xff0c;我是灵魂画师向阳 2024年&#xff0c;是AI绘画技术飞速发展的一年&#xff0c;各种AI绘画工具层出不穷&#xff0c;为了让大家在了解和学习AI绘画的过程中少走弯路&#xff0c;今天我将详细介绍目前世界上使用用户最多&#xff0c;社区最大&#xff0c;生态…

Springboot与minio

一、介绍 Minio是一个简单易用的云存储服务&#xff0c;它让你可以轻松地把文件上传到互联网上&#xff0c;这样无论你在哪里&#xff0c;只要有网络&#xff0c;就能访问或分享这些文件。如果你想要从这个仓库里取出一张图片或一段视频&#xff0c;让网站的访客能看到或者下载…

遭华尔街大幅看好,收入高速增长但面临困境,Zillow股票还能买入吗?

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经的核心观点&#xff1a; &#xff08;1&#xff09;Wedbush已经将Zillow的评级上调为“跑赢大盘”&#xff0c;目标价为80美元。 &#xff08;2&#xff09;第二季度业绩强劲&#xff0c;收入继续保持着两位数增长…

esp32-C2 对接火山引擎实现语音转文本(二)

目录 一、 语音转文本初始化 二、 WedStream 事件处理函数 一、 语音转文本初始化 Volcengine_vtt_handle_t Volcengine_Vtt_Init(Volcengine_vtt_config_t *config) {// 管道配置audio_pipeline_cfg_t pipeline_cfg = DEFAULT_AUDIO_PIPELINE_CONFIG();Volcengine_vtt_t *vt…

架构师:在 Spring Cloud 中实现全局异常处理的技术指南

1、简述 在分布式系统中,微服务架构是最流行的设计模式之一。Spring Cloud 提供了各种工具和库来简化微服务的开发和管理。然而,随着服务的增多,处理每个服务中的异常变得尤为复杂。因此,实现统一的全局异常处理成为了关键。本篇博客将介绍如何在 Spring Cloud 微服务架构…

DevExpress中文教程:如何将WinForms数据网格连接到ASP. NET Core WebAPI服务?

日前DevExpress官方发布了DevExpress WinForms的后续版本——将.NET桌面客户端连接到安全后端Web API服务(EF Core with OData)&#xff0c;在本文中我们将进一步演示如何使用一个更简单的服务来设置DevExpress WinForms数据网格。 P.S&#xff1a;DevExpress WinForms拥有180…

关于github GPG的配置

GitHub 使用 OpenPGP 库来确认本地签名的提交和标记&#xff0c;是否根据你在 GitHub.com 上添加到帐户的公钥进行加密验证。 这里是github关于GPG密钥的文档&#xff1a;https://docs.github.com/zh/authentication/managing-commit-signature-verification/about-commit-sig…