1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置
flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
核心方法:
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("SHOW CATALOGS");
}
b)类血缘
2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置
b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{
<SHOW> <CATALOGS>
{
return new SqlShowCatalogs(getPos());
}
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置
b)config.fmpp 内容
data: {
# 解析器文件路径
parser: tdd(../data/Parser.tdd)
}
# 扩展文件的目录
freemarkerLinks: {
includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: [
"CATALOGS"
]
# 新增的语法解析方法
statementParserMethods: [
"SqlShowCatalogs()"
]
# 包含的扩展语法文件
implementationFiles: [
"parserImpls.ftl"
]
4)编译模板文件和语法文件
5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.Collections;
import java.util.List;
/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {
public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);
public SqlXShowCatalogs(SqlParserPos pos) {
super(pos);
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("XSHOW CATALOGS");
}
}
2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{
<XSHOW> <CATALOGS>
{
return new SqlXShowCatalogs(getPos());
}
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:
"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"
keywords:
"XSHOW"
statementParserMethods:
"SqlXShowCatalogs()"
4)重新编译
mvn generate-resources
5)执行测试用例
可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class CustomFlinkSql {
public static void main(String[] args) throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build());
// 拓展自定义语法 xshow catalogs 前
// SQL parse failed. Non-query expression encountered in illegal context
tEnv.executeSql("xshow catalogs").print();
// 拓展自定义语法 xshow catalogs 后
// SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
}
}
6)查看生成的扩展解析器类
可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。
3、validate 概述
在向 Flink 中添加完自定义的解析规则后,报错信息如下:
SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate
作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。
sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert
作用:将校验过的 SqlNode 转换为 Operator。
else if (validated instanceof SqlXShowCatalogs) {
return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;
public class XShowCatalogsOperation implements ShowOperation {
@Override
public String asSummaryString() {
return "SHOW CATALOGS";
}
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class CustomFlinkSql {
public static void main(String[] args) throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build());
// FlinkSQL原本支持的语法
tEnv.executeSql("show catalogs").print();
// 自定义语法
tEnv.executeSql("xshow catalogs").print();
}
}
5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验
final SqlNode validated = flinkPlanner.validate(sqlNode);
2、预校验重写 Insert 语句
3、调用 SqlNode.validate() 进行校验
1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
3)如果是:SqlRichExplain
4)其它:validator.validate(sqlNode)
1.校验作用域和表达式:validateScopedExpression(topNode, scope)
a)将 SqlNode 进行规范化重写
b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
c)校验 validateQuery
i)validateFeature
ii)validateNamespace
iii)validateModality
iv)validateAccess
v)validateSnapshot
d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
2.获取校验之后的节点类型
2、将 SQLNode 转换为 Operation
converter.convertSqlQuery(validated)
1)生成逻辑执行计划 RelNode
RelRoot relational = planner.rel(validated);
1.对查询进行转换
sqlToRelConverter.convertQuery(validatedSqlNode)
2)创建 PlannerQueryOperation
new PlannerQueryOperation(relational.project());
3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));
1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodes
val optimizedRelNodes = optimize(relNodes)
2)将 optimizedRelNodes 转换为 execGraph
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
3)将 execGraph 转换为 transformations
1.使用代码生成技术生成Function,后续可以反射调用
val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)