Flink 自定义拓展 SQL 语法

news2025/1/15 20:46:29
1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

在这里插入图片描述

核心方法

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("SHOW CATALOGS");
    }
b)类血缘

在这里插入图片描述

2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置

在这里插入图片描述

b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{
    <SHOW> <CATALOGS>
    {
        return new SqlShowCatalogs(getPos());
    }
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置

在这里插入图片描述

b)config.fmpp 内容
data: {
	# 解析器文件路径
  parser: tdd(../data/Parser.tdd)
}

# 扩展文件的目录
freemarkerLinks: {
  includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: [
    "CATALOGS"
  ]
# 新增的语法解析方法
statementParserMethods: [
    "SqlShowCatalogs()"
  ]
# 包含的扩展语法文件
implementationFiles: [
    "parserImpls.ftl"
  ]
4)编译模板文件和语法文件

在这里插入图片描述

5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {
    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);

    public SqlXShowCatalogs(SqlParserPos pos) {
        super(pos);
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
        return Collections.emptyList();
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("XSHOW CATALOGS");
    }
}

2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{
    <XSHOW> <CATALOGS>
    {
       return new SqlXShowCatalogs(getPos());
    }
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:

"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"

keywords:

"XSHOW"

statementParserMethods:

"SqlXShowCatalogs()"
4)重新编译
 mvn generate-resources
5)执行测试用例

可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());
				
				// 拓展自定义语法 xshow catalogs 前
        // SQL parse failed. Non-query expression encountered in illegal context
        tEnv.executeSql("xshow catalogs").print();

        // 拓展自定义语法 xshow catalogs 后
        // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
    }
}

6)查看生成的扩展解析器类

可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

在这里插入图片描述

3、validate 概述

在向 Flink 中添加完自定义的解析规则后,报错信息如下:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate

作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

在这里插入图片描述

sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert

作用:将校验过的 SqlNode 转换为 Operator。

在这里插入图片描述

else if (validated instanceof SqlXShowCatalogs) {
            return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
     return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;

public class XShowCatalogsOperation implements ShowOperation {
    @Override
    public String asSummaryString() {
        return "SHOW CATALOGS";
    }
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());

				// FlinkSQL原本支持的语法
        tEnv.executeSql("show catalogs").print();
        
        // 自定义语法
        tEnv.executeSql("xshow catalogs").print();
    }
}

在这里插入图片描述

5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验

final SqlNode validated = flinkPlanner.validate(sqlNode);

2、预校验重写 Insert 语句

3、调用 SqlNode.validate() 进行校验

	1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
	2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
	3)如果是:SqlRichExplain
	4)其它:validator.validate(sqlNode)
		
			1.校验作用域和表达式:validateScopedExpression(topNode, scope)
					a)将 SqlNode 进行规范化重写
          b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
          c)校验 validateQuery 
          	i)validateFeature
          	ii)validateNamespace
          	iii)validateModality
          	iv)validateAccess
          	v)validateSnapshot
          d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
       
       2.获取校验之后的节点类型

2、将 SQLNode 转换为 Operation

converter.convertSqlQuery(validated)

	1)生成逻辑执行计划 RelNode
	RelRoot relational = planner.rel(validated);
		
		1.对查询进行转换
		sqlToRelConverter.convertQuery(validatedSqlNode)
		
	2)创建 PlannerQueryOperation
	new PlannerQueryOperation(relational.project());
	
3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

	1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodes
	val optimizedRelNodes = optimize(relNodes)
	
	2)将 optimizedRelNodes 转换为 execGraph
	val execGraph = translateToExecNodeGraph(optimizedRelNodes)
	
	3)将 execGraph 转换为 transformations
	
		1.使用代码生成技术生成Function,后续可以反射调用
		val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1134202.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM 调优JVM 性能优化

所谓“调优”就是一个诊断和处理手段&#xff0c;最终的目标是让系统的处理能力&#xff0c;也就是“性能”达到最优化。 计算机系统中&#xff0c;性能相关的资源主要分为这几类&#xff1a; CPU&#xff1a;CPU 是系统最关键的计算资源&#xff0c;在单位时间内有限&#xf…

Windows10系统安装telnet命令

简介 telnet命令可以测试目标服务器端口是否开通&#xff0c;使用命令 telnet ip地址 端口&#xff0c;输入命令后回车&#xff0c;如果进入输入状态&#xff0c;则表示目标服务器端口已开通&#xff0c;可以通过外网访问 Windows10系统安装步骤 1.打开控制面板 2.选择程序…

MS1112可替代ADS1112,16-bit 多输入内置基准模数转换器

MS1112 是一款高精度 16bit 模数转换器&#xff0c;具有 2 组差分输入 或 3 组单端输入通道&#xff0c;高达 16bits 的分辨率。内部集成 2.048V 基 准源&#xff0c;差分输入范围达到 2.048V 。 MS1112 使用了 I 2 C 兼容接口&#xff0c; 并有 2 个地址管…

Linux网络流量监控iftop

在 Linux 系统下即时监控服务器的网络带宽使用情况&#xff0c;有很多工具&#xff0c;比如 iptraf、nethogs 等等&#xff0c;但是推荐使用小巧但功能很强大的 iftop 工具【官网&#xff1a;http://www.ex-parrot.com/~pdw/iftop/】。iftop 是 Linux 系统一个免费的网卡实时流…

elasticsearch-5.6.15集群部署,如何部署x-pack并添加安全认证

目录 一、环境 1、JDK、映射、域名、三墙 2、三台服务器创建用户、并为用户授权 二、配置elasticsearch-5.6.15实例 1、官网获取elasticsearch-5.6.15.tar.gz&#xff0c;拉取到三台服务器 2、elas环境准备 3、修改elasticsearch.yml配置 4、修改软、硬件线程数 5、修改…

PHP 危险函数1-OS 命令执行函数

OS 命令执行函数 system 函数 特点&#xff1a; 自带输出功能自动区分系统平台&#xff0c;只需得知目标系统在 system() 中输入对应系统的命令即可 使用 <?phpsystem("<系统命令>"); ?>如 <pre> <?phpsystem("ipconfig"); …

如何在VScode中让printf输出中文

如何在VScode中让printf输出中文&#xff1f; 1、在“Visual Studio Code”图标上右击&#xff0c;弹出对话框。见下图&#xff1a; 2、点击“以管理员身份运行”&#xff0c;得到下图&#xff1a; 3、点击“UTF-8”按钮&#xff0c;得到下图&#xff1a; 4、点击“通过编码重…

机器学习实验三:决策树-隐形眼镜分类(判断视力程度)

决策树-隐形眼镜分类&#xff08;判断视力程度&#xff09; Title : 使用决策树预测隐形眼镜类型 # Description :隐形眼镜数据是非常著名的数据集 &#xff0c;它包含很多患者眼部状况的观察条件以及医生推荐的隐形眼镜类型 。 # 隐形眼镜类型包括硬材质 、软材质以及不适合佩…

MFC 注册表

文章目录 打开注册表对注册表的键的操作创建子键删除子键遍历子键 对注册表值的操作读取值设置值 打开注册表 void CREGDemoDlg::OnBnClickedBtnOpen() {//打开注册表HKEY hKey;if (ERROR_SUCCESS ! RegOpenKeyEx(HKEY_CURRENT_USER, L"SOFTWARE\\Baidu.com", 0, KEY…

*Django中的Ajax 纯js的书写样式1

搭建项目 建立一个Djano项目&#xff0c;建立一个app&#xff0c;建立路径&#xff0c;视图函数大多为render, Ajax的创建 urls.py path(index/,views.index), path(index2/,views.index2), views.py def index(request):return render(request,01.html) def index2(requ…

【Java集合类面试二十三】、List和Set有什么区别?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;List和Set有什么区别&am…

【Java集合类面试二十五】、有哪些线程安全的List?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;有哪些线程安全的List&a…

Controller接收Postman的raw参数时,属性值全部为空

Controller接收Postman的raw参数时&#xff0c;属性值全部为空 情景再现 在进行业务代码的编写过程中&#xff0c;使用Postman等工具调用Controller接口时&#xff0c;发现属性值全部为空后端代码如下&#xff1a; Requset对象为&#xff1a; public class QuerySkuRequest …

十八、字符串(4)

本章概要 扫描输入 Scanner 分隔符用正则表达式扫描 StringTokenizer 类 扫描输入 到目前为止&#xff0c;从文件或标准输入读取数据还是一件相当痛苦的事情。一般的解决办法就是读入一行文本&#xff0c;对其进行分词&#xff0c;然后使用 Integer、Double 等类的各种解析方…

【Java】电子病历编辑器源码(云端SaaS服务)

电子病历编辑器极具灵活性&#xff0c;它既可嵌入到医院HIS系统中&#xff0c;作为内置编辑工具供多个模块使用&#xff0c;也可以独立拿出来&#xff0c;与第三方业务厂商展开合作&#xff0c;为他们提供病历书写功能&#xff0c;充分发挥编辑器的功能。 电子病历基于云端SaaS…

机器学习之查准率、查全率与F1

文章目录 查准率&#xff08;Precision&#xff09;&#xff1a;查全率&#xff08;Recall&#xff09;&#xff1a;F1分数&#xff08;F1 Score&#xff09;&#xff1a;实例P-R曲线F1度量python实现 查准率&#xff08;Precision&#xff09;&#xff1a; 定义&#xff1a; …

ORB-SLAM安装过程遇到问题记录整理

一、ORB-SLAM2 1.c error: ‘decay_t’ is not a member of ‘std’ 如下图所示&#xff1a; 解决方法&#xff1a; 修改 ORB_SLAM的 CMAKELIST.txt文件&#xff0c; 将set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdc11") 修改为 set(CMAKE_CXX_STANDARD 14) 2…

R语言的极值统计学、分位数回归、机器学习方法

受到气候变化、温室效应以及人类活动等因素的影响&#xff0c;自然界中极端高温、极端环境污染、大洪水和大暴雨等现象的发生日益频繁&#xff1b;在人类社会中&#xff0c;股市崩溃、金融危机等极端情况也时有发生&#xff1b;今年的新冠疫情就是非常典型的极端现象。研究此类…

2017年亚太杯APMCM数学建模大赛B题喷雾轨迹规划问题求解全过程文档及程序

2017年亚太杯APMCM数学建模大赛 B题 喷雾轨迹规划问题 原题再现 喷釉工艺用喷釉枪或喷釉机在压缩空气下将釉喷入雾中&#xff0c;使釉附着在泥体上。这是陶瓷生产过程中一个容易实现自动化的过程。由于不均匀的釉料在烧制过程中会产生裂纹&#xff0c;导致工件报废&#xff0…

065:mapboxGL在一个图层中随机添加100个标记(marker)

第065个 点击查看专栏目录 本示例的目的是演示如何在vue+mapbox中在一个图层上随机添加100个标记(marker)。要添加多个标记,或者向交互式 Web 或移动地图添加标记,通常必须提供 GeoJSON 格式或矢量切片集的点数据。 您可以在运行之前将数据添加到地图样式,方法是使用 Mapb…