前言
本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。
版本
- Flink 1.15.4
- Hudi 0.13.0
目标
在文章Hudi Flink SQL代码示例及本地调试中提到:我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在HoodiePipeline
的sink
和source
方法里,而这两个方法也是分别调用了HoodieTableFactory
的createDynamicTableSink
和createDynamicTableSource
。那么Table API的代码怎么一步一步走到createDynamicTableSink
和createDynamicTableSource
的呢?返回HoodieTableSink
之后又是怎么写数据的?因为我发现Hudi写数据的主要逻辑入口好像是在HoodieTableSink.getSinkRuntimeProvider
的方法体里,这些问题之前都没有搞清楚,所以这次的目标就是要搞清楚:1、Table API 的入口到createDynamicTableSink
返回HoodieTableSink
的主要代码步骤; 2、在哪里调用HoodieTableSink.getSinkRuntimeProvider
的方法体进行后面的写Hudi逻辑的
相关类:
HoodiePipeline
(DataStream API)HoodieTableFactory
HoodieTableSink
DataStreamSinkProviderAdapter
(函数式接口)TableEnvironmentImpl
BatchPlanner
PlannerBase
FactoryUtil
BatchExecSink
CommonExecSink
DataStream API
其实上面的问题在DataStream API代码里很容易看出来,我们先看一下DataStream API写Hudi的代码,详细代码在文章:Flink Hudi DataStream API代码示例
DataStream<RowData> dataStream = env.fromElements(
GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),
GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("id int")
.column("name string")
.column("price double")
.column("ts bigint")
.column("dt string")
.pk("id")
.partition("dt")
.options(options);
builder.sink(dataStream, false);
HoodiePipeline.Builder.sink
public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
TableDescriptor tableDescriptor = getTableDescriptor();
return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
}
HoodiePipeline.sink
private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
.consumeDataStream(input);
}
在HoodiePipeline.sink
就可以找到答案:
1、HoodieTableFactory
.createDynamicTableSink
返回HoodieTableSink
2、HoodieTableSink
.getSinkRuntimeProvider
返回DataStreamSinkProviderAdapter
3、DataStreamSinkProviderAdapter
.consumeDataStream
调用HoodieTableSink
.getSinkRuntimeProvider
中的方法体执行后面的写Hudi逻辑。这里的dataStream为我们最开始在程序里创建的DataStream<RowData>
HoodieTableSink
.getSinkRuntimeProvider
getSinkRuntimeProvider
返回DataStreamSinkProviderAdapter
,其中Lambda 表达式dataStream -> {}
为DataStreamSinkProviderAdapter
.consumeDataStream(dataStream)
的具体实现
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProviderAdapter) dataStream -> {
// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// set up default parallelism
OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());
RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();
// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
return Pipelines.bulkInsert(conf, rowType, dataStream);
}
// Append mode
if (OptionsResolver.isAppendMode(conf)) {
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
} else {
return Pipelines.dummySink(pipeline);
}
}
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
// write pipeline
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// compaction
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
return Pipelines.compact(conf, pipeline);
} else {
return Pipelines.clean(conf, pipeline);
}
};
}
DataStreamSinkProviderAdapter
其实是一个函数式接口,它是一种只包含一个抽象方法的接口。Lambda 表达式可以被赋值给一个函数式接口,从而实现接口的实例化
public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
@Override
default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
return consumeDataStream(dataStream);
}
}
函数式接口和Lambda 表达式参考下面两篇文章:
https://it.sohu.com/a/682888110_100123073
https://blog.csdn.net/Speechless_/article/details/123746047
Table API
知道了 DataStream API 调用步骤后,来对比看一下 Table API 的大致调用步骤,调试代码入口。
tableEnv.executeSql(String.format("insert into %s values (1,'hudi',10,100,'2023-05-28')", tableName));
整体调用流程
1、tableEnv.executeSql
->TableEnvironmentImpl.executeSql
->executeInternal(Operation operation)
->executeInternal(List<ModifyOperation> operations)
->this.translate
->(PlannerBase)this.planner.translate
2.1、PlannerBase.translate
->PlannerBase.translateToRel
->getTableSink(catalogSink.getContextResolvedTable, dynamicOptions)
->FactoryUtil.createDynamicTableSink
->HoodieTableFactory.createDynamicTableSink
2.2、PlannerBase.translate
->(BatchPlanner)translateToPlan(execGraph)
->(ExecNodeBase)node.translateToPlan
->(BatchExecSink)translateToPlanInternal
->(CommonExecSink)createSinkTransformation
->(HoodieTableSink)getSinkRuntimeProvider
->(CommonExecSink)applySinkProvider
->provider.consumeDataStream
具体代码
TableEnvironmentImpl
(TableEnvironmentImpl)executeSql
public TableResult executeSql(String statement) {
List<Operation> operations = this.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.");
} else {
// 关键步骤:executeInternal
return this.executeInternal((Operation)operations.get(0));
}
}
executeInternal(Operation operation)
public TableResultInternal executeInternal(Operation operation) {
if (operation instanceof ModifyOperation) {
// 关键步骤:executeInternal
return this.executeInternal(Collections.singletonList((ModifyOperation)operation));
} else if (operation instanceof StatementSetOperation) {
return this.executeInternal(((StatementSetOperation)operation).getOperations());
executeInternal(List<ModifyOperation> operations)
public TableResultInternal executeInternal(List<ModifyOperation> operations) {
// 关键步骤:translate
List<Transformation<?>> transformations = this.translate(operations);
List<String> sinkIdentifierNames = this.extractSinkIdentifierNames(operations);
TableResultInternal result = this.executeInternal(transformations, sinkIdentifierNames);
if ((Boolean)this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)) {
try {
result.await();
} catch (ExecutionException | InterruptedException var6) {
result.getJobClient().ifPresent(JobClient::cancel);
throw new TableException("Fail to wait execution finish.", var6);
}
}
return result;
}
translate
这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()
protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
// 这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()
// 关键步骤:PlannerBase.translate
return this.planner.translate(modifyOperations);
}
BatchPlanner
(BatchPlanner的父类)PlannerBase.translate
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
beforeTranslation()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// 关键步骤:translateToRel
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
// 关键步骤:translateToPlan
val transformations = translateToPlan(execGraph)
afterTranslation()
transformations
}
PlannerBase.translateToRel
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
val dataTypeFactory = catalogManager.getDataTypeFactory
modifyOperation match {
case s: UnregisteredSinkModifyOperation[_] =>
val input = getRelBuilder.queryOperation(s.getChild).build()
val sinkSchema = s.getSink.getTableSchema
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),
null,
dataTypeFactory,
getTypeFactory)
LogicalLegacySink.create(
query,
s.getSink,
"UnregisteredSink",
ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))
case collectModifyOperation: CollectModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
DynamicSinkUtils.convertCollectToRel(
getRelBuilder,
input,
collectModifyOperation,
getTableConfig,
getFlinkContext.getClassLoader
)
case catalogSink: SinkModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
val dynamicOptions = catalogSink.getDynamicOptions
// 关键步骤:getTableSink
getTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {
case (table, sink: TableSink[_]) =>
// Legacy tables can't be anonymous
val identifier = catalogSink.getContextResolvedTable.getIdentifier
// check the logical field type and physical field type are compatible
val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
// validate logical schema and physical schema are compatible
validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
// validate TableSink
validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
table.getResolvedSchema,
identifier.asSummaryString,
dataTypeFactory,
getTypeFactory)
val hints = new util.ArrayList[RelHint]
if (!dynamicOptions.isEmpty) {
hints.add(RelHint.builder("OPTIONS").hintOptions(dynamicOptions).build)
}
LogicalLegacySink.create(
query,
hints,
sink,
identifier.toString,
table,
catalogSink.getStaticPartitions.toMap)
case (table, sink: DynamicTableSink) =>
DynamicSinkUtils.convertSinkToRel(getRelBuilder, input, catalogSink, sink)
} match {
case Some(sinkRel) => sinkRel
case None =>
throw new TableException(
s"Sink '${catalogSink.getContextResolvedTable}' does not exists")
}
PlannerBase.getTableSink
private def getTableSink(
contextResolvedTable: ContextResolvedTable,
dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {
contextResolvedTable.getTable[CatalogBaseTable] match {
case connectorTable: ConnectorCatalogTable[_, _] =>
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
toScala(connectorTable.getTableSink) match {
case Some(sink) => Some(resolvedTable, sink)
case None => None
}
case regularTable: CatalogTable =>
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
...
if (
!contextResolvedTable.isAnonymous &&
TableFactoryUtil.isLegacyConnectorOptions(
catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
tableConfig,
isStreamingMode,
objectIdentifier,
resolvedTable.getOrigin,
isTemporary
)
) {
...
} else {
...
// 关键步骤:FactoryUtil.createDynamicTableSink
val tableSink = FactoryUtil.createDynamicTableSink(
factory,
objectIdentifier,
tableToFind,
Collections.emptyMap(),
getTableConfig,
getFlinkContext.getClassLoader,
isTemporary)
Option(resolvedTable, tableSink)
}
case _ => None
}
FactoryUtil.createDynamicTableSink
根据’connector’=‘hudi’ 找到factory为org.apache.hudi.table.HoodieTableFactory,接着调用
HoodieTableFactory.createDynamicTableSink
public static DynamicTableSink createDynamicTableSink(
@Nullable DynamicTableSinkFactory preferredFactory,
ObjectIdentifier objectIdentifier,
ResolvedCatalogTable catalogTable,
Map<String, String> enrichmentOptions,
ReadableConfig configuration,
ClassLoader classLoader,
boolean isTemporary) {
final DefaultDynamicTableContext context =
new DefaultDynamicTableContext(
objectIdentifier,
catalogTable,
enrichmentOptions,
configuration,
classLoader,
isTemporary);
try {
// 'connector'='hudi'
// org.apache.hudi.table.HoodieTableFactory
final DynamicTableSinkFactory factory =
preferredFactory != null
? preferredFactory
: discoverTableFactory(DynamicTableSinkFactory.class, context);
// 关键步骤:HoodieTableFactory.createDynamicTableSink
return factory.createDynamicTableSink(context);
} catch (Throwable t) {
throw new ValidationException(
String.format(
"Unable to create a sink for writing table '%s'.\n\n"
+ "Table options are:\n\n"
+ "%s",
objectIdentifier.asSummaryString(),
catalogTable.getOptions().entrySet().stream()
.map(e -> stringifyOption(e.getKey(), e.getValue()))
.sorted()
.collect(Collectors.joining("\n"))),
t);
}
}
HoodieTableFactory.createDynamicTableSink
第一个问题解决
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
"Option [path] should not be empty.");
setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
// 关键步骤:HoodieTableSink
return new HoodieTableSink(conf, schema);
}
BatchExecSink
回到方法PlannerBase.translate
,它会在后面调用translateToPlan
。execGraph.getRootNodes
返回的内容为BatchExecSink
(想知道为啥是BatchExecSink
,可以看PlannerBase.translate
中调用的translateToExecNodeGraph
方法),
BatchExecSink
是BatchExecNode
的子类,所以会执行node.translateToPlan
PlannerBase.translateToPlan
override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
beforeTranslation()
val planner = createDummyPlanner()
val transformations = execGraph.getRootNodes.map {
// BatchExecSink
// 关键步骤:ExecNodeBase.translateToPlan
case node: BatchExecNode[_] => node.translateToPlan(planner)
case _ =>
throw new TableException(
"Cannot generate BoundedStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
afterTranslation()
transformations
}
BatchExecSink
public class BatchExecSink extends CommonExecSink implements BatchExecNode<Object> {
...
public abstract class CommonExecSink extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
...
ExecNodeBase.translateToPlan
public final Transformation<T> translateToPlan(Planner planner) {
if (transformation == null) {
transformation =
// 关键步骤:BatchExecSink.translateToPlanInternal
translateToPlanInternal(
(PlannerBase) planner,
ExecNodeConfig.of(
((PlannerBase) planner).getTableConfig(),
persistedConfig,
isCompiled));
if (this instanceof SingleTransformationTranslator) {
if (inputsContainSingleton()) {
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
}
}
}
return transformation;
}
BatchExecSink.translateToPlanInternal
protected Transformation<Object> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
final Transformation<RowData> inputTransform =
(Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner);
// org.apache.hudi.table.HoodieTableSink
final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext());
// 关键步骤:CommonExecSink.createSinkTransformation
return createSinkTransformation(
planner.getExecEnv(), config, inputTransform, tableSink, -1, false);
}
CommonExecSink.createSinkTransformation
这里的tableSink为HoodieTableSink,会调用HoodieTableSink的getSinkRuntimeProvider方法返回runtimeProvider(没有执行里面的方法体)
protected Transformation<Object> createSinkTransformation(
StreamExecutionEnvironment streamExecEnv,
ExecNodeConfig config,
Transformation<RowData> inputTransform,
// 这里的tableSink为HoodieTableSink
DynamicTableSink tableSink,
int rowtimeFieldIndex,
boolean upsertMaterialize) {
final ResolvedSchema schema = tableSinkSpec.getContextResolvedTable().getResolvedSchema();
final SinkRuntimeProvider runtimeProvider =
// 关键步骤:HoodieTableSink.getSinkRuntimeProvider
tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
final RowType physicalRowType = getPhysicalRowType(schema);
final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);
final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);
final int inputParallelism = inputTransform.getParallelism();
final boolean inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT);
final boolean hasPk = primaryKeys.length > 0;
...
return (Transformation<Object>)
// 关键步骤:CommonExecSink.applySinkProvider
applySinkProvider(
sinkTransform,
streamExecEnv,
runtimeProvider,
rowtimeFieldIndex,
sinkParallelism,
config);
}
CommonExecSink.applySinkProvider
先通过new DataStream<>(env, sinkTransformation)生成dataStream,接着通过执行
provider.consumeDataStream
调用HoodieTableSink.getSinkRuntimeProvider
中的方法体,这里的provider为HoodieTableSink.getSinkRuntimeProvider
返回的DataStreamSinkProviderAdapter
private Transformation<?> applySinkProvider(
Transformation<RowData> inputTransform,
StreamExecutionEnvironment env,
SinkRuntimeProvider runtimeProvider,
int rowtimeFieldIndex,
int sinkParallelism,
ExecNodeConfig config) {
TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);
if (runtimeProvider instanceof DataStreamSinkProvider) {
Transformation<RowData> sinkTransformation =
applyRowtimeTransformation(
inputTransform, rowtimeFieldIndex, sinkParallelism, config);
// 生成dataStream
final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
// 关键步骤:provider.consumeDataStream
return provider.consumeDataStream(createProviderContext(config), dataStream)
.getTransformation();
} else if (runtimeProvider instanceof TransformationSinkProvider) {
...
provider.consumeDataStream
(已经在上面的类DataStreamSinkProviderAdapter
提过)
它会调用
HoodieTableSink.getSinkRuntimeProvider
中的方法体(Lambda 表达式)执行后面的写hudi逻辑
第二个问题解决
default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
return consumeDataStream(dataStream);
}
总结
本文主要简单记录了自己调试 Hudi Flink SQL 源码的过程,并没有对源码进行深入的分析(自己水平也不够)。主要目的是为了弄清楚从Table API
的入口到createDynamicTableSink
返回HoodieTableSink
的主要代码步骤以及在哪里调用HoodieTableSink.getSinkRuntimeProvider
的方法体以进行后面的写Hudi逻辑,这样便于后面对Hudi源码的分析和学习。
本文新学习知识点:函数式接口以及对应的 Lambda 表达式的实现