Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

news2025/1/10 1:53:03

背景

在Apache Hudi初探(一)(与flink的结合)中,我们提到了Pipelines.hoodieStreamWrite 写hudi文件,这个操作真正写hudi是在Pipelines.hoodieStreamWrite方法下的transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory),具体分析一下写入的过程。

分析

对于transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)这个代码片段,我们主要看operatorFactory 这个对象(transform这个操作是Flink框架的操作):

public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {

  public StreamWriteOperator(Configuration conf) {
    super(new StreamWriteFunction<>(conf));
  }

  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
    return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));
  }
}

最主要的hudi算子为StreamWriteOperator,其中最主要的操作是由StreamWriteFunction来完成的:

// StreamWriteFunction
  

   @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {
    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
    this.metaClient = StreamerUtil.createMetaClient(this.config);
    this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext());
    this.writeStatuses = new ArrayList<>();
    this.writeMetadataState = context.getOperatorStateStore().getListState(
        new ListStateDescriptor<>(
            "write-metadata-state",
            TypeInformation.of(WriteMetadataEvent.class)
        ));

    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
    this.currentInstant = lastPendingInstant();
    if (context.isRestored()) {
      restoreWriteMetadata();
    } else {
      sendBootstrapEvent();
    }
    // blocks flushing until the coordinator starts a new instant
    this.confirming = true;
  }

  @Override
  public void open(Configuration parameters) throws IOException {
    this.tracer = new TotalSizeTracer(this.config);
    initBuffer();
    initWriteFunction();
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    if (inputEnded) {
      return;
    }
    snapshotState();
    // Reload the snapshot state as the current state.
    reloadWriteMetaState();
  }

  @Override
  public void snapshotState() {
    // Based on the fact that the coordinator starts the checkpoint first,
    // it would check the validity.
    // wait for the buffer data flush out and request a new instant
    flushRemaining(false);
  }

  @Override
  public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
    bufferRecord((HoodieRecord<?>) value);
  }

  • initializeState操作,主要是做一些初始化的操作

    • this.taskID = getRuntimeContext().getIndexOfThisSubtask();
      获取当前的task的索引下标,用来向operator coordinator发送event给operator coordinator,之后 StreamWriteOperatorCoordinator(operator coordinator) 进行处理,后续会说到StreamWriteOperatorCoordinator

    • metaClient = StreamerUtil.createMetaClient(this.config)
      writeClient = FlinkWriteClients.createWriteClient
      初始化hudi的元数据客户端(这里是HoodieTableMetaClient)和写入客户端(这里是HoodieFlinkWriteClient)

    • writeStatuses = new ArrayList<>()
      记录后续的写入hudi文件的信息

    • writeMetadataState = context.getOperatorStateStore().getListState
      记录写入hudi的元数据事件,会在后续的操作中,会包装成event发送给operator coordinator(StreamWriteOperatorCoordinator)

    • ckpMetadata = CkpMetadata.getInstance
      Flink的checkpoint的元数据信息路径,默认的路径是/${hoodie.basePath}/.hoodie/.aux/ckp_meta

    • currentInstant = lastPendingInstant()
      获取上次还没有完成的commit

    • restoreWriteMetadata或者sendBootstrapEvent,根据是否是从checkpoint恢复过来的进行不同消息的发送,
      这里的operator coordinator(StreamWriteOperatorCoordinator)会进行统一的处理,并初始化一个commit

  • open操作
    写入hudi前的前置操作,比如说 初始化TotalSizeTracer记录maxBufferSize便于flush操作
    根据write.operation的值(默认是upsert)选择后续的操作是insert或upsert或overwrite,这里是upsert

  • processElement操作
    这里对传入的HoodieRecord进行缓存,主要是bufferRecord做的事情,

    • 首先会获取bucketID,之后再往对应的bucket中插入数据
    • 如果超出write.batch.size(默认是128MB),则会进行flushBucket操作,该操作主要是写入hudi操作 //TODO: 具体的写入hudi操作
      • 首先会获取新的需要提交的commit
      • 再进行写入的实际操作
      • 写入的文件元数据信息回传到operator coordinator进行统一处理
  • snapshotState 操作

    • 调用flushRemaining 写入剩下的数据到hudi存储中
    • 重新加载当前写入的hudi文件元数据信息到当前flink的state中

hudi StreamWriteOperatorCoordinator作用

总的来说,StreamWriteOperatorCoordinator扮演的角色和在Spark中driver的角色一样,都是来最后来提交 元数据信息到huid中。
具体的作用还是得从具体的方法来看:

  @Override
  public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
    ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
        "The coordinator can only handle WriteMetaEvent");
    WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;

    if (event.isEndInput()) {
      // handle end input event synchronously
      // wrap handleEndInputEvent in executeSync to preserve the order of events
      executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);
    } else {
      executor.execute(
          () -> {
            if (event.isBootstrap()) {
              handleBootstrapEvent(event);
            } else {
              handleWriteMetaEvent(event);
            }
          }, "handle write metadata event for instant %s", this.instant
      );
    }
  }
  ...
  @Override
  public void notifyCheckpointComplete(long checkpointId) {
    executor.execute(
        () -> {
          // The executor thread inherits the classloader of the #notifyCheckpointComplete
          // caller, which is a AppClassLoader.
          Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
          // for streaming mode, commits the ever received events anyway,
          // the stream write task snapshot and flush the data buffer synchronously in sequence,
          // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
          final boolean committed = commitInstant(this.instant, checkpointId);

          if (tableState.scheduleCompaction) {
            // if async compaction is on, schedule the compaction
            CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
          }

          if (tableState.scheduleClustering) {
            // if async clustering is on, schedule the clustering
            ClusteringUtil.scheduleClustering(conf, writeClient, committed);
          }

          if (committed) {
            // start new instant.
            startInstant();
            // sync Hive if is enabled
            syncHiveAsync();
          }
        }, "commits the instant %s", this.instant
    );
  }
  • handleEventFromOperator方法用来接受task发送的消息

    • 对于BootStrap类型的WriteMetadataEvent(在StreamWriteFunction方法initializeState中),相当于函数初始化也就会触发
      该类型的消息由handleBootstrapEvent来处理(我们这里假设每个任务operator都完成了初始化的操作),对应的数据流如下:

      initInstant
         ||
         \/
      reset => startInstant
      

      startInstant 这里就会初始化一个hudi写操作的commit信息

    • 对于一般的write的信息的event,(比如说在processElement的flushBucket函数中),由handleWriteMetaEvent来处理:

       if (this.eventBuffer[event.getTaskID()] != null) {
       this.eventBuffer[event.getTaskID()].mergeWith(event);
       } else {
         this.eventBuffer[event.getTaskID()] = event;
       }
      

      这里只是加到变量名为eventBuffer 的WriteMetadataEvent类型的数组中,后续中会进行处理

    • 对于isEndInputtrue的event,这种一般source是基于文件的这种,这里先不讨论

  • notifyCheckpointComplete 当对应的checkpointId完成以后,该方法会被调用

    • commitInstant 提交hudi元数据,如果如果有发生异常,则回滚当前hudi对应的commit
    • scheduleCompaction && scheduleClustering 进行hui的CompcationClustering
    • 如果成功的提交了,则会开启一个新的commit,如果开了hive同步(hive_sync.enabled默认为false),则会同步元数据信息到hive

总结

用一张图总结一下交互方式,如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/909315.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

稳定扩散ControlNet v1.1 权威指南

ControlNet 是一种稳定扩散模型&#xff0c;可让你从参考图像中复制构图或人体姿势。 经验丰富的稳定扩散用户知道生成想要的确切成分有多难。图像有点随机。你所能做的就是玩数字游戏&#xff1a;生成大量图像并选择你喜欢的图片。 借助 ControlNet&#xff0c;稳定扩散用户…

创建型(二) - 单例模式

一、概念 单例设计模式&#xff08;Singleton Design Pattern&#xff09;&#xff1a;一个类只允许创建一个对象&#xff08;或者实例&#xff09;&#xff0c;那这个类就是一个单例类。 优点&#xff1a;在内存里只有一个实例&#xff0c;减少了内存的开销&#xff0c;避免…

VMwar安装Centos7保姆级教程

下载文件 首先我们先下载Centos7的官方镜像和VM虚拟机软件 下面是百度云盘的下载链接 链接&#xff1a;https://pan.baidu.com/s/1aF55_F9IK4pFB45d5vHBmg?pwd87vc 提取码&#xff1a;87vc –来自百度网盘超级会员V1的分享 安装虚拟机 首先我们先把VMware16.1.0.rar文件解压…

PDF怎么转成PPT文件免费?一个软件解决

随着科技的不断发展和进步&#xff0c;电子文档已经成为我们日常工作和学习中不可或缺的一部分。PDF作为一种跨平台的文件格式&#xff0c;以其可靠性和易读性而备受推崇。然而&#xff0c;在某些情况下&#xff0c;我们可能需要PDF怎么转成PPT文件免费&#xff0c;以便更好地展…

Android Studio 接入OpenCV最简单的例子 : 实现灰度图效果

1. 前言 上文 我们在Windows电脑上实现了人脸功能&#xff0c;接下来我们要把人脸识别的功能移植到Android上。 那么首先第一步&#xff0c;就是要创建一个Native的Android项目&#xff0c;并且配置好OpenGL&#xff0c;并能够调用成功。 这里我们使用的是openCV-4.8.0&#x…

【HCIP】生成树--STP

一、STP 1.产生背景 在星状拓扑或者树形拓扑中&#xff0c;当某个设备或者某条链路出现故障&#xff0c;就会导致数据不能正常转发&#xff0c;出现单点故障的问题。 为了防止出现单点故障&#xff0c;一般需要环形拓扑来保证链路的冗余性&#xff0c;当某条链路出现故障&…

基于Jenkins构建生产CICD环境-------从小白到大神之路之学习运维第84天

第四阶段 时 间&#xff1a;2023年8月21日 参加人&#xff1a;全班人员 内 容&#xff1a; 基于Jenkins构建生产CICD环境 目录 一、环境概述 二、Jenkins简介 &#xff08;一&#xff09;Jenkins 包含以下几个特点&#xff1a; &#xff08;二&#xff09;持续集成 …

Matlab使用

Matlab使用 界面介绍 新建脚本&#xff1a;实际上就是新建一个新建后缀为.m的文件 新建编辑器&#xff1a;ctrlN 打开&#xff1a;打开最近文件&#xff0c;以找到最近写过的文件 点击路径&#xff0c;切换当前文件夹 预设&#xff1a;定制习惯用的界面 常见简单指令 ;…

S05-巧用单元格格式转换数据

视频教程 文章目录 S05-巧用单元格格式转换数据 S05-巧用单元格格式转换数据 格式类型默认格式&#xff08;常规&#xff09;转换格式数值1.21.200货币1.2&#xffe5;1.20会计专用1.2&#xffe5;1.20日期43567四月十二日时间0.3333333338:00 AM百分比1.2120.00%分数0.21/5科…

云养猪平台如何开发

随着数字化和智能化的发展&#xff0c;农业行业也逐渐开始融入互联网技术&#xff0c;其中云养猪平台作为新兴的农业数字化解决方案之一&#xff0c;备受关注。本文将探讨如何开发一款具备专业、思考深度和逻辑性的云养猪平台。 一、前期准备阶段&#xff1a; 1.明确目…

分类预测 | MATLAB实现1D-2D-CNN-GRU的多通道输入数据分类预测

分类预测 | MATLAB实现1D-2D-CNN-GRU的多通道输入数据分类预测 目录 分类预测 | MATLAB实现1D-2D-CNN-GRU的多通道输入数据分类预测分类效果基本介绍程序设计参考资料 分类效果 基本介绍 结合1D时序-2D图像多模态融合的CNN-GRU故障识别算法&#xff0c;基于一维时序信号和二维图…

k8s之nodes

概念&#xff1a; Node是Kubernetes中的工作节点&#xff0c;最开始被称为minion。一个Node可以是VM或物理机。每个Node&#xff08;节点&#xff09;具有运行pod的一些必要服务&#xff0c;并由Master组件进行管理&#xff0c;Node节点上的服务包括Docker、kubelet和kube-pro…

派森 #P126. 维吉尼亚加密

描述 维吉尼亚密码引入了“密钥”的概念&#xff0c;即根据密钥来决定字符的替换关系。 如上图为维吉尼亚密码的加密过程示意&#xff0c;左边为加密替换表&#xff0c;上面第一行代表明文字母&#xff0c;左面第一列代表密钥字母&#xff0c;对如下明文加密&#xff1a;‪‬…

在飞腾麒麟linux上编译安装tcpdump小结

1 从&#xff1a;【免费】在麒麟Linux下编译所需要的四个源文件资源-CSDN文库&#xff0c;https://download.csdn.net/download/dijkstar/88236907下载几个源文件&#xff1a; 2 按照上面文件列表的顺序解压、编译、安装&#xff1a; &#xff08;注意&#xff1a;操作之前&am…

贴片电阻的额定功率

1、不同封装电阻的额定功率不同 通常是越大的封装&#xff0c;额定功率越大&#xff0c;常见的封装和功率对应如下表&#xff1a; 封装 功率 0402 62.5mW 0603 100mW 0805 125mW 1206 250mW 1210 500mW 2、关于额定功率和环境温度 额定功率&#xff1a;在额定环境…

【Spring专题】Spring之Bean的生命周期源码解析——阶段二(三)(属性填充之循环依赖底层原理解析)

目录 前置知识循环依赖的产生Spring里面的3个Map 课程内容一、只有一级缓存的推理演进1.1 直接将实例化后生成的对象放入到单例池里面1.1 引入一个中间Map存实例化后的早期对象&#xff08;疑似二级缓存&#xff09;1.3 解决1.2需要被代理的问题&#xff08;疑似二级缓存&#…

鸿蒙应用开发学习路线(OpenHarmony/HarmonyOS)

鸿蒙应用开发学习路线&#xff08;OpenHarmony/HarmonyOS&#xff09; HarmonyOS应用开发学习路线网站汇总社区汇总视频学习路线 OpenHarmony应用开发学习路线与资料网站汇总社区汇总学习路线 MarkDown工具推荐 HarmonyOS应用开发学习路线 作者&#xff1a;坚果 团队&#xff1…

gremlin安装使用 详细步骤

gremlin是一个图数据库查询工具&#xff0c;注意他只是一个工具类似于dbeaver&#xff0c;navicat&#xff0c;sqlyog&#xff0c;是专门来分析图数据库的一个工具。 下载 下载地址Apache Download Mirrors 省事的可以直接 wget https://www.apache.org/dyn/closer.lua/tin…

LC-平衡二叉树

LC-平衡二叉树 链接&#xff1a;https://leetcode.cn/problems/balanced-binary-tree/description/ 描述&#xff1a;给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a;一个二叉树每个节点 的左右两个子树的…

电容笔哪个厂家的产品比较好?开学值得买电容笔推荐

开学快要到来了&#xff0c;各位学生党又开始为开学而准备&#xff0c;而电容笔对于学生党来说是必备的数码产品。苹果的正版Pencil&#xff0c;由于价格很贵&#xff0c;仅仅一支的售价就要近千块钱&#xff0c;所以很多人都买不起。现在国内的平板电脑电容笔已经很完美了&…