(二开)Flink 修改源码拓展 SQL 语法

news2025/1/11 12:00:40
1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

在这里插入图片描述

核心方法

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("SHOW CATALOGS");
    }
b)类血缘

在这里插入图片描述

2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置

在这里插入图片描述

b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{
    <SHOW> <CATALOGS>
    {
        return new SqlShowCatalogs(getPos());
    }
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置

在这里插入图片描述

b)config.fmpp 内容
data: {
	# 解析器文件路径
  parser: tdd(../data/Parser.tdd)
}

# 扩展文件的目录
freemarkerLinks: {
  includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: [
    "CATALOGS"
  ]
# 新增的语法解析方法
statementParserMethods: [
    "SqlShowCatalogs()"
  ]
# 包含的扩展语法文件
implementationFiles: [
    "parserImpls.ftl"
  ]
4)编译模板文件和语法文件

在这里插入图片描述

5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {
    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);

    public SqlXShowCatalogs(SqlParserPos pos) {
        super(pos);
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
        return Collections.emptyList();
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("XSHOW CATALOGS");
    }
}

2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{
    <XSHOW> <CATALOGS>
    {
       return new SqlXShowCatalogs(getPos());
    }
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:

"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"

keywords:

"XSHOW"

statementParserMethods:

"SqlXShowCatalogs()"
4)重新编译
 mvn generate-resources
5)执行测试用例

可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());
				
				// 拓展自定义语法 xshow catalogs 前
        // SQL parse failed. Non-query expression encountered in illegal context
        tEnv.executeSql("xshow catalogs").print();

        // 拓展自定义语法 xshow catalogs 后
        // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
    }
}

6)查看生成的扩展解析器类

可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

在这里插入图片描述

3、validate 概述

在向 Flink 中添加完自定义的解析规则后,报错信息如下:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate

作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

在这里插入图片描述

sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert

作用:将校验过的 SqlNode 转换为 Operator。

在这里插入图片描述

else if (validated instanceof SqlXShowCatalogs) {
            return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
     return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;

public class XShowCatalogsOperation implements ShowOperation {
    @Override
    public String asSummaryString() {
        return "SHOW CATALOGS";
    }
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());

				// FlinkSQL原本支持的语法
        tEnv.executeSql("show catalogs").print();
        
        // 自定义语法
        tEnv.executeSql("xshow catalogs").print();
    }
}

在这里插入图片描述

5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验

final SqlNode validated = flinkPlanner.validate(sqlNode);

2、预校验重写 Insert 语句

3、调用 SqlNode.validate() 进行校验

	1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
	2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
	3)如果是:SqlRichExplain
	4)其它:validator.validate(sqlNode)
		
			1.校验作用域和表达式:validateScopedExpression(topNode, scope)
					a)将 SqlNode 进行规范化重写
          b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
          c)校验 validateQuery 
          	i)validateFeature
          	ii)validateNamespace
          	iii)validateModality
          	iv)validateAccess
          	v)validateSnapshot
          d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
       
       2.获取校验之后的节点类型

2、将 SQLNode 转换为 Operation

converter.convertSqlQuery(validated)

	1)生成逻辑执行计划 RelNode
	RelRoot relational = planner.rel(validated);
		
		1.对查询进行转换
		sqlToRelConverter.convertQuery(validatedSqlNode)
		
	2)创建 PlannerQueryOperation
	new PlannerQueryOperation(relational.project());
	
3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

	1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodes
	val optimizedRelNodes = optimize(relNodes)
	
	2)将 optimizedRelNodes 转换为 execGraph
	val execGraph = translateToExecNodeGraph(optimizedRelNodes)
	
	3)将 execGraph 转换为 transformations
	
		1.使用代码生成技术生成Function,后续可以反射调用
		val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137699.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis缓存击穿 穿透

我们之前写了一把分布式锁 并且用redis写的, redis内部实现是比较完善的&#xff0c;但是我们公司用的时候 redis 至少都是主从&#xff0c;哨兵,cluster 很少有单机的 呢么我们分布式锁基于集群问题下会有什么问题 比如说当第一个线程设置一个key过来进行加锁&#xff0c;加锁…

html/css/javascript/js实现的简易打飞机游戏

源码下载地址 支持&#xff1a;远程部署/安装/调试、讲解、二次开发/修改/定制 视频浏览地址

Maven项目转为SpringBoot项目

Maven项目转为SpringBoot项目 前言创建一个maven项目前的软件的一些通用设置Maven仓库的设置其他的设置字符编码编译器注解支持 创建的Maven项目修改为Spring Boot项目修改pom.xml文件修改启动类-Main新建WAR包所需的类 添加核心配置文件 测试的控制器最后整个项目的目录结构![…

Bayes决策:身高与体重特征进行性别分类

代码与文件请从这里下载&#xff1a;Auorui/Pattern-recognition-programming: 模式识别编程 (github.com) 简述 分别依照身高、体重数据作为特征&#xff0c;在正态分布假设下利用最大似然法估计分布密度参数&#xff0c;建立最小错误率Bayes分类器&#xff0c;写出得到的决…

@AutoConfigurationPackage注解类

包名package org.springframework.boot.autoconfigure 方法 String[] basePackages() 向AutoConfigurationPackages中注册的基本包&#xff0c;使用basePackageClasses作为基于字符串的包的类型安全替代方案 Class<?>[] basePackageClasses() 键入basePackage…

VL10F后台生成发货单时报错:物料 XXXXX 状态被锁定/未激活(不允许发货)

错误原因&#xff1a;物料主数据&#xff1a;销售视图1中&#xff0c;物料的发货状态没有激活。MM02修改物料的发货状态后正常生成单据。

双十一值得买的数码产品、这几款都不容错过

一年一度的双11终于来了&#xff0c;相信很多朋友都打算在此次的双11入手自己想要的产品&#xff0c;作为一个数码爱好者&#xff0c;我也是在此次的双11入手了下面4款数码产品&#xff0c;一起来看看吧&#xff01; 1、不用入耳佩戴的开放式耳机 -官方售价&#xff1a;199 …

WebSocket协议:5分钟从入门到精通

一、内容概览 WebSocket的出现&#xff0c;使得浏览器具备了实时双向通信的能力。本文由浅入深&#xff0c;介绍了WebSocket如何建立连接、交换数据的细节&#xff0c;以及数据帧的格式。此外&#xff0c;还简要介绍了针对WebSocket的安全攻击&#xff0c;以及协议是如何抵御类…

互联网产品说明书指南,附撰写流程与方法

产品说明书&#xff0c;对于普通产品而言&#xff0c;再常见不过。药物、电器、电子产品等产品在正式出售时&#xff0c;往往都会附带一份产品说明书&#xff0c;以此告诉用户这个产品的功能与特性&#xff0c;并指导用户如何来使用这个产品。 产品说明书 那么&#xff0c;对于…

洗衣行业在线预约小程序+前后端完整搭建教程

大家好哇&#xff0c;好久不见&#xff01;今天源码师父来给大家推荐一款洗衣行业在线预约的小程序&#xff0c;带有前后端的完整搭建教程。 目前&#xff0c;人们对生活品质的追求不断提高&#xff0c;但生活节奏却也不断加快。对品质的追求遇到了忙碌的生活节奏&#xff0c;…

天锐绿盾终端安全管理系统

所谓透明&#xff0c;是指对使用者来说是未知的。当使用者在打开或编辑指定文件时&#xff0c;系统将自动对未加密的文件进行加密&#xff0c;对已加密的文件自动解密。文件在硬盘上是密文&#xff0c;在内存中是明文。一旦离开使用环境&#xff0c;由于应用程序无法得到自动解…

web安全-原发抗抵赖

原发抗抵赖 原发抗抵赖也称不可否认性&#xff0c;主要表现以下两种形式&#xff1a; 数据发送者无法否认其发送数据的事实。例如&#xff0c;A向B发信&#xff0c;事后&#xff0c;A不能否认该信是其发送的。数据接收者事后无法否认其收到过这些数据。例如&#xff0c;A向B发…

动态规划(记忆化搜索)

AcWing 901. 滑雪 给定一个 R行 C 列的矩阵&#xff0c;表示一个矩形网格滑雪场。 矩阵中第 i 行第 j 列的点表示滑雪场的第 i 行第 j 列区域的高度。 一个人从滑雪场中的某个区域内出发&#xff0c;每次可以向上下左右任意一个方向滑动一个单位距离。 当然&#xff0c;一个人能…

git更新代码时显示“auto-detection of host provider took too long“移除方法

git更新代码时显示"auto-detection of host provider took too long"移除方法 问题描述 在windows操作系统&#xff0c;未连接互连网电脑&#xff0c;更新内网代码库时显示“auto-detection of host provider took too long (>2000ms)”&#xff0c;如下图所示。…

idea 中配置 maven

前文叙述&#xff1a; 配置 maven 一共要设置两个地方&#xff1a;1、为当前项目设置2、为新项目设置maven 的下载和安装可参考我之前写过的文章&#xff0c;具体的配置文章中也都有讲解。1、为当前项目进行 maven 配置 配置 VM Options: -DarchetypeCataloginternal2、为新项…

设将n(n>1)个整数存放在一维数组R中。设计一个在时间和空间两方面都尽可能高效的算法。将R中保存的序列循环左移P(0<P<n)个位置

设将n&#xff08;n>1&#xff09;个整数存放在一维数组R中。设计一个在时间和空间两方面都尽可能高效的算法。将R中保存的序列循环左移P&#xff08;0<P<n&#xff09;个位置。即将R中保存的数据由&#xff08;x0,x1…,xn-1&#xff09;变为&#xff08;xp,xp1…x0,x…

vscode下ssh免密登录linux服务器

vscode使用ssh免密登录linux 1、安装SSH插件2、生成密钥3、linux安装ssh服务4、linux下配置公钥5、vscode远程登录 注&#xff1a;测试环境为window10Ubuntu1804/Ubuntu2204 1、安装SSH插件 扩展->搜索SSH->点击install进行安装&#xff0c;如下图所示&#xff1a; 2、…

【Linux】安装部署Redis

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ Redis安装部署linux 1.gcc编译环境2.c库环境3.…

记src-Juniper

一、hunter搜索web.title”Juniper Web Device Manager”&#xff0c;查找到香港的一处资产。 二、访问相关网页&#xff0c;界面就是Juniper登录界面。 三、根据编号为CVE-2023-36845的利用方法&#xff0c;抓包&#xff0c;修改提交方式&#xff0c;成功读取到文件。 原始包…

09. 主频和时钟配置

09. 主频和时钟配置 硬件原理图分析7路PLL时钟源时钟树简介内核时钟系统主频设置CCM_CACRRCCSRCCM_ANALOG_PLL_ARMn代码实例 PFD时钟设置PLL2的4路设置PLL3的4路设置代码实例 AHB、IPG和PERCLK根时钟设置AHB_CLK_ROOT 和 IPG_CLK_ROOTPERCLK_CLK_ROOTCCM_CBCDRCCM_CBCMRCCM_CSC…