背景
目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:
class DefaultSource extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with StreamSourceProvider
with SparkAdapterSupport
with Serializable {
闲说杂谈
继续上次的Apache Hudi初探(四)涉及的代码:
// HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下:
createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
||
\/
onDataWriterCommit
||
\/
commit/abort
- 在解释commit做的事情之前,DataSourceInternalWriterHelper在构建器阶段还有做了一件事,那就是writeClient.preWrite:
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
// writeClient是 SparkRDDWriteClient 实例
writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);
- metaClient构建一个HoodieTableMetaClient类型的 hoodie 元数据客户端
如果hoodie.metastore.enable开启(默认是不开启),则新建HoodieTableMetastoreClient类型的实例,否则新建HoodieTableMetastoreClient实例 - writeClient.preWrite 这是在写入数据前做的准备工作
- 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空
- 是否开启异步clean清理服务 会根据hoodie.clean.automatic(默认是true)或者hoodie.clean.async(默认是false)和hoodie.table.services.enabled(默认是true),来启动AsyncCleanerService.startAsyncCleaningIfEnabled
- 是否开启archive归档服务,会根据hoodie.archive.automatic(默认是true)或者hoodie.archive.async(默认是false)和hoodie.table.
services.enabled(默认是true) 来启动服务 AsyncCleanerService.startAsyncArchiveIfEnabled - 所以默认情况clean和Archive服务都不是异步后台服务
- 来看commit所做的事情,它最终会调用到dataSourceInternalWriterHelper.commit方法:
public void commit(List<HoodieWriteStat> writeStatList) {
try {
writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata),
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
} finally {
writeClient.close();
}
}
这里的writeClient是SparkRDDWriteClient的实例,该实例的对一个的commit方法的如下:
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
// Skip the empty commit if not allowed
if (!config.allowEmptyCommit() && stats.isEmpty()) {
return true;
}
LOG.info("Committing " + instantTime + " action " + commitActionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
this.txnManager.beginTransaction(Option.of(inflightInstant),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(inflightInstant, metadata);
commit(table, commitActionType, instantTime, metadata, stats);
// already within lock, and so no lock requried for archival
postCommit(table, metadata, instantTime, extraMetadata, false);
LOG.info("Committed " + instantTime);
releaseResources();
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
}
// We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false
try {
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
} catch (Exception e) {
if (config.isFailOnInlineTableServiceExceptionEnabled()) {
throw e;
}
LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage()
+ ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
}
emitCommitMetrics(instantTime, metadata, commitActionType);
// callback if needed.
if (config.writeCommitCallbackOn()) {
if (null == commitCallback) {
commitCallback = HoodieCommitCallbackFactory.create(config);
}
commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats));
}
return true;
}
- 如果是不允许空提交(hoodie.allow.empty.commit默认是true,也就是允许空提交),也就是没有任何数据插入的情况下,就直接返回
这对于比如offset的元数据也是需要记录下来的 - createTable 新建一个HoodieTable,这里我们加入建立了HoodieSparkMergeOnReadTable类型的表
- CommitUtils.buildMetadata 构造元信息,
其中传入的参数operationType是bulk_insert,schemaToStoreInCommit是avro schema(之前有设置),commitActionType为deltacommit,partitionToReplaceFileIds为Map.empty,这里只是构建了HoodieCommitMetadata对象,把对应的元数据的信息记录了下来 - HoodieInstant 新建了一个HoodieInstant类型的实例,这里是表明是inflight阶段
- 判断heartbeat是否超时,如果是hoodie.cleaner.policy.failed.writes是LAZY,且超时,则报异常
- txnManager.beginTransaction 开启事务,主要是获取锁
如果是hoodie.write.concurrency.mode是optimistic_concurrency_control,则会开启事务,因为这种情况下会存在冲突的可能性- lockManager.lock() 从hoodie.write.lock.provider配置中获取锁,默认是ZookeeperBasedLockProvider 实现是基于InterProcessMutex
会基于hoodie.metrics.lock.enable的配置是否开启lock时期的metrics - reset(currentTxnOwnerInstant 把这次的TxnOwnerInstant设置为currentTxnOwnerInstant
- lockManager.lock() 从hoodie.write.lock.provider配置中获取锁,默认是ZookeeperBasedLockProvider 实现是基于InterProcessMutex