文章目录
- 1.EventTime计算原理
- 2.案例使用
- 2.1 Maven pom 依赖
- 2.2 设置EventTime
- 2.3 Flink API
- 2.4 Flink SQL
- 2.5 读取EventTime
在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批。
1.EventTime计算原理
writer 算子, 负责数据写入文件, writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。
2.案例使用
将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。
如何解决乱序到来问题, 我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。
2.1 Maven pom 依赖
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
<version>0.12.1</version>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>0.12.1</version>
</dependency>
</dependencies>
2.2 设置EventTime
2.3 Flink API
用户只需要设置flink conf指定时间字段作为时间推进字段
Map<String, String> options = new HashMap<>();
// 这里省略其他表字段
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("id int not null")
.column("ts string")
.column("dt string")
.pk("id")
.partition("dt")
.options(options);
2.4 Flink SQL
通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段
create table hudi_cow_01(\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
// 这里省略其他参数
" 'hoodie.payload.event.time.field' = 'ts'\n"
")
2.5 读取EventTime
Spark SQL
call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');
java API
Option<HoodieCommitMetadata> commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 获取到当前版本的时间进度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);
输出结果:
current eventTime: 1667971364799