解析 flink sql 转化成flink job

news2025/1/12 12:08:31

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {


    public static void main(String[] args) {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        // 创建一个内置示例源表
        String sourceDDL = "CREATE TABLE users (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    age INT\n" +
                ") WITH (\n" +
                "    'connector' = 'filesystem',\n" +
                "    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +
                "    'format' = 'csv'\n" +
                ");";
        tableEnv.executeSql(sourceDDL);


        Table table = tableEnv.sqlQuery("select * from users limit 1 ");


        String explanation = tableEnv.explainSql("select * from users limit 1 ");
        System.out.println(explanation);
        table.execute().print();
    }
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, users]])

== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])
   +- Limit(offset=[0], fetch=[1], global=[false])
      +- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])
   +- Limit(offset=[0], fetch=[1], global=[false])
      +- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {

  /** Rule must only match if TableScan targets a bounded [[ScanTableSource]] */
  //规则只匹配有界的ScanTableSource
  override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
    tableSourceTable match {
      case tst: TableSourceTable =>
        tst.tableSource match {
          case sts: ScanTableSource =>
            sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBounded
          case _ => false
        }
      case _ => false
    }
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
    //在这里转成自定义的relnode 
    new BatchPhysicalTableSourceScan(
      rel.getCluster,
      newTrait,
      scan.getHints,
      scan.getTable.asInstanceOf[TableSourceTable]
    )
  }
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    hints: util.List[RelHint],
    tableSourceTable: TableSourceTable)
  extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)
  with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子
  override def translateToExecNode(): ExecNode[_] = {
    val tableSourceSpec = new DynamicTableSourceSpec(
      tableSourceTable.contextResolvedTable,
      util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
    tableSourceSpec.setTableSource(tableSourceTable.tableSource)
    
    new BatchExecTableSourceScan(
      unwrapTableConfig(this),
      tableSourceSpec,
      FlinkTypeFactory.toLogicalRowType(getRowType),
      getRelDetailedDescription)
  }
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了
 public Transformation<RowData> createInputFormatTransformation(
            StreamExecutionEnvironment env,
            InputFormat<RowData, ?> inputFormat,
            InternalTypeInfo<RowData> outputTypeInfo,
            String operatorName) {
        // env.createInput will use ContinuousFileReaderOperator, but it do not support multiple
        // paths. If read partitioned source, after partition pruning, we need let InputFormat
        // to read multiple partitions which are multiple paths.
        // We can use InputFormatSourceFunction directly to support InputFormat.
       
        final InputFormatSourceFunction<RowData> function =
                new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
        return env.addSource(function, operatorName, outputTypeInfo).getTransformation();
    }

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1854303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

绘制口罩maskTheFace数据源是300w_lp

官网下载mask the face 代码&#xff0c;增加代码draw_face.py import argparse import cv2 import scipy.io from tqdm import tqdm from utils.aux_functions_2 import *# 设置命令行输入参数 parser argparse.ArgumentParser(description"MaskTheFace - Python code…

C++的特殊类设计 饥饿汉模式

目录 特殊类设计 设计一个不能被拷贝的类 设计一个只能在堆上创建对象的类 设计一个只能在栈上创建对象的类 设计一个不能继承的类 设计模式 单例模式 饿汉模式 饥汉模式 特殊类设计 设计一个不能被拷贝的类 C98的设计方式&#xff1a;将该类的拷贝构造和赋值运算符…

OpenGL3.3_C++_Windows(17)

Demo演示 demo演示 绘制不同的图元&#xff08;点&#xff0c;线…&#xff09;&#xff1a; 理解 glDrawArrays 和 glDrawElements的区别 glDrawArrays &#xff1a;渲染的图元模式mode&#xff08;可以参考&#xff09;&#xff0c;起始位置&#xff0c;顶点数量glDrawElem…

昇思25天学习打卡营第2天|张量Tensor

一、张量的定义&#xff1a; 张量是一种特殊的数据结构&#xff0c;与数组和矩阵非常相似。张量&#xff08;Tensor&#xff09;是MindSpore网络运算中的基本数据结构&#xff08;也是所有深度学习模型的基础数据结构&#xff09;&#xff0c;下面将主要介绍张量和稀疏张量的属…

Maven的依赖传递、依赖管理、依赖作用域

在Maven项目中通常会引入大量依赖&#xff0c;但依赖管理不当&#xff0c;会造成版本混乱冲突或者目标包臃肿。因此&#xff0c;我们以SpringBoot为例&#xff0c;从三方面探索依赖的使用规则。 1、 依赖传递 依赖是会传递的&#xff0c;依赖的依赖也会连带引入。例如在项目中…

大型企业网络DHCP服务器配置安装实践@FreeBSD

企业需求 需要为企业里的机器配置一台DHCP服务器。因为光猫提供DHCP服务的能力很差&#xff0c;多机器dhcp多机器NAT拓扑方式机器一多就卡顿。使用一台路由器来进行子网络的dhcp和NAT服务&#xff0c;分担光猫负载&#xff0c;但是还有一部分机器需要放到光猫网络&#xff0c;…

一、企业级架构设计-archimate基础概念

目录 一、标准 二、实现工具 1、Archimate 1、Archimate 基本概念 1、通用元模型 2、结构关系 3、依赖关系 1、服务关系 2、访问关系 3、影响关系 1、影响方式 2、概念 3、关系线 4、案例 4、关联关系 4、动态、节点和其他关系 1、时间或因果关系 2、信息流 …

ubuntu18.04 编译HBA 并实例运行

HBA是一个激光点云层级式的全局优化的程序&#xff0c;他的论文题目是&#xff1a;HBA: A Globally Consistent and Efficient Large-Scale LiDAR Mapping Module&#xff0c;对应的github地址是&#xff1a;HKU-Mars-Lab GitHub 学习本博客&#xff0c;可以学到gtsam安装&am…

6.S081的Lab学习——Lab8: locks

文章目录 前言一、Memory allocator(moderate)提示&#xff1a;解析 二、Buffer cache(hard)解析&#xff1a; 三、Barrier (moderate)解析&#xff1a; 总结 前言 一个本硕双非的小菜鸡&#xff0c;备战24年秋招。打算尝试6.S081&#xff0c;将它的Lab逐一实现&#xff0c;并…

[数据集][目标检测]药片药丸检测数据集VOC+YOLO格式152张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;152 标注数量(xml文件个数)&#xff1a;152 标注数量(txt文件个数)&#xff1a;152 标注类别…

Django 模版过滤器

Django模版过滤器是一个非常有用的功能&#xff0c;它允许我们在模版中处理数据。过滤器看起来像这样&#xff1a;{{ name|lower }}&#xff0c;这将把变量name的值转换为小写。 1&#xff0c;创建应用 python manage.py startapp app5 2&#xff0c;注册应用 Test/Test/sett…

ic基础|功耗篇03:ic设计人员如何在代码中降低功耗?一文带你了解行为级以及RTL级低功耗技术

大家好&#xff0c;我是数字小熊饼干&#xff0c;一个练习时长两年半的ic打工人。我在两年前通过自学跨行社招加入了IC行业。现在我打算将这两年的工作经验和当初面试时最常问的一些问题进行总结&#xff0c;并通过汇总成文章的形式进行输出&#xff0c;相信无论你是在职的还是…

【计算机网络篇】数据链路层(13)共享式以太网与交换式以太网的对比

文章目录 &#x1f354;共享式以太网与交换式以太网的对比&#x1f50e;主机发送单播帧的情况&#x1f50e;主机发送广播帧的情况&#x1f50e;多对主机同时通信 &#x1f6f8;使用集线器和交换机扩展共享式以太网的区别 &#x1f354;共享式以太网与交换式以太网的对比 下图是…

基于STM32的智能家居安防系统

目录 引言环境准备智能家居安防系统基础代码实现&#xff1a;实现智能家居安防系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;智能家居安防管理与优化问题解决方案与优化收尾与总结 1. 引言 智能家居安防系统通过使…

使用J-Link Commander查找STM32死机问题

接口:PA13,PA14&#xff0c;请勿连接复位引脚。 输入usb命令这里我已经连接过了STM32F407VET6了。 再输入connect命令这里我已经默认选择了SWD接口&#xff0c;4000K速率。 可以输入speed 4000命令选择4000K速率: 写一段崩溃代码进行测试: void CashCode(void){*((volatil…

springboot+vue+mybatis旅游管理+PPT+论文+讲解+售后

随着人民生活水平的提高,旅游业已经越来越大众化,而旅游业的核心是信息,不论是对旅游管理部门、对旅游企业,或是对旅游者而言,有效的获取旅游信息,都显得特别重要.旅游管理系统将使旅游相关信息管理工作规范化、信息化、程序化,提供旅游景点、旅游线路,旅游新闻等服务本文以jsp…

笔记本更换固态,保留数据,无需重装系统和软件

一、问题描述&#xff1a; 原有一块128GB的固态硬盘作为c盘使用&#xff0c;由于工作学习需要&#xff0c;经常跑虚拟机&#xff0c;现在需要升级容量。 二、解决思路&#xff1a; 硬件 购买一款大容量的固态硬盘 不同的容量有不同的价格&#xff0c;这个根据预算和实际需要来…

【C#】使用数字和时间方法ToString()格式化输出字符串显示

在C#编程项目开发中&#xff0c;几乎所有对象都有格式化字符串方法&#xff0c;其中常见的是数字和时间的格式化输出多少不一样&#xff0c;按实际需要而定吧&#xff0c;现记录如下&#xff0c;以后会用得上。 文章目录 数字格式化时间格式化 数字格式化 例如&#xff0c;保留…

Python 虚拟环境 requirements.txt 文件生成 ;pipenv导出pip安装文件

搜索关键词: Python 虚拟环境Pipenv requirements.txt 文件生成;Pipenv 导出 pip requirements.txt安装文件 本文基于python版本 >3.9 文章内容有效日期2023年01月开始(因为此方法从这个时间开始是完全ok的) 上述为pipenv的演示版本 使用以下命令可精准生成requirement…

【windows解压】解压文件名乱码

windows解压&#xff0c;文件名乱码但内容正常。 我也不知道什么时候设置出的问题。。。换了解压工具也没用&#xff0c;后来是这样解决的。 目录 1.环境和工具 2.打开【控制面板】 3.点击【时钟和区域】 4.选择【区域】 5.【管理】中【更改系统区域设置】 6.选择并确定…