#Apache Hudi初探(五)(与spark的结合)

news2024/12/23 19:42:38

背景

目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈

继续上次的Apache Hudi初探(四)涉及的代码:

 // HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下:
 createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
     ||
     \/
 onDataWriterCommit
     ||
     \/
 commit/abort
  • 在解释commit做的事情之前,DataSourceInternalWriterHelper在构建器阶段还有做了一件事,那就是writeClient.preWrite
    this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
    // writeClient是 SparkRDDWriteClient 实例
    writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);
  • metaClient构建一个HoodieTableMetaClient类型的 hoodie 元数据客户端
    如果hoodie.metastore.enable开启(默认是不开启),则新建HoodieTableMetastoreClient类型的实例,否则新建HoodieTableMetastoreClient实例
  • writeClient.preWrite 这是在写入数据前做的准备工作
    • 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空
    • 是否开启异步clean清理服务 会根据hoodie.clean.automatic(默认是true)或者hoodie.clean.async(默认是false)和hoodie.table.services.enabled(默认是true),来启动AsyncCleanerService.startAsyncCleaningIfEnabled
    • 是否开启archive归档服务,会根据hoodie.archive.automatic(默认是true)或者hoodie.archive.async(默认是false)和hoodie.table.
      services.enabled
      (默认是true) 来启动服务 AsyncCleanerService.startAsyncArchiveIfEnabled
    • 所以默认情况clean和Archive服务都不是异步后台服务
  • 来看commit所做的事情,它最终会调用到dataSourceInternalWriterHelper.commit方法:
public void commit(List<HoodieWriteStat> writeStatList) {
    try {
      writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata),
          CommitUtils.getCommitActionType(operationType, metaClient.getTableType()));
    } catch (Exception ioe) {
      throw new HoodieException(ioe.getMessage(), ioe);
    } finally {
      writeClient.close();
    }
  }

这里的writeClientSparkRDDWriteClient的实例,该实例的对一个的commit方法的如下:

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
                             String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
    // Skip the empty commit if not allowed
    if (!config.allowEmptyCommit() && stats.isEmpty()) {
      return true;
    }
    LOG.info("Committing " + instantTime + " action " + commitActionType);
    // Create a Hoodie table which encapsulated the commits and files visible
    HoodieTable table = createTable(config, hadoopConf);
    HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
        extraMetadata, operationType, config.getWriteSchema(), commitActionType);
    HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
    HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
    this.txnManager.beginTransaction(Option.of(inflightInstant),
        lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
    try {
      preCommit(inflightInstant, metadata);
      commit(table, commitActionType, instantTime, metadata, stats);
      // already within lock, and so no lock requried for archival
      postCommit(table, metadata, instantTime, extraMetadata, false);
      LOG.info("Committed " + instantTime);
      releaseResources();
    } catch (IOException e) {
      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
    } finally {
      this.txnManager.endTransaction(Option.of(inflightInstant));
    }

    // We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false
    try {
      // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
      runTableServicesInline(table, metadata, extraMetadata);
    } catch (Exception e) {
      if (config.isFailOnInlineTableServiceExceptionEnabled()) {
        throw e;
      }
      LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage()
          + ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
    }

    emitCommitMetrics(instantTime, metadata, commitActionType);

    // callback if needed.
    if (config.writeCommitCallbackOn()) {
      if (null == commitCallback) {
        commitCallback = HoodieCommitCallbackFactory.create(config);
      }
      commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats));
    }
    return true;
  }
  • 如果是不允许空提交(hoodie.allow.empty.commit默认是true,也就是允许空提交),也就是没有任何数据插入的情况下,就直接返回
    这对于比如offset的元数据也是需要记录下来的
  • createTable 新建一个HoodieTable,这里我们加入建立了HoodieSparkMergeOnReadTable类型的表
  • CommitUtils.buildMetadata 构造元信息,
    其中传入的参数operationTypebulk_insertschemaToStoreInCommit是avro schema(之前有设置),commitActionTypedeltacommit,partitionToReplaceFileIdsMap.empty,这里只是构建了HoodieCommitMetadata对象,把对应的元数据的信息记录了下来
  • HoodieInstant 新建了一个HoodieInstant类型的实例,这里是表明是inflight阶段
  • 判断heartbeat是否超时,如果是hoodie.cleaner.policy.failed.writesLAZY,且超时,则报异常
  • txnManager.beginTransaction 开启事务,主要是获取锁
    如果是hoodie.write.concurrency.modeoptimistic_concurrency_control,则会开启事务,因为这种情况下会存在冲突的可能性
    • lockManager.lock()hoodie.write.lock.provider配置中获取锁,默认是ZookeeperBasedLockProvider 实现是基于InterProcessMutex
      会基于hoodie.metrics.lock.enable的配置是否开启lock时期的metrics
    • reset(currentTxnOwnerInstant 把这次的TxnOwnerInstant设置为currentTxnOwnerInstant

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/530287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3588平台开发系列讲解(进程篇)Linux 进程的数据结构

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、Linux 进程的数据结构二、创建 task_struct 结构三、Linux 进程地址空间四、Linux 进程文件表沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 本篇将介绍 Linux 如何表示进程。 一、Linux 进程的数据结构…

Java测试:OJ练习(字符串合并后返回按照先后顺序排列的不重复新字符串、合并数组并按升序排列、Arrays 类中的sort方法)

1、给定一个长度为n的字符 串&#xff0c;字符串中只包含大小写字母。 请你返回该字符串拥有那些字符。 并将它们按照出现的先后&#xff01;顺序拼接成一个新的字符串。 这是我最开始写的&#xff0c;代码有点问题&#xff1a; public String setString(String str) {char[]…

文本三剑客之——Awk

Awk Awk简介Awk语法格式Awk常见内置变量Awk实例演示按行输出文本BEGIN模式和END模式按字段输出文本通过管道&#xff0c;双引号调用shell命令date 的用法getline的用法awk数组 Awk简介 Awk是一个功能强大的编辑工具&#xff0c;用于在Linux/UNIX 下对文本和数据进行处理。数据…

代码随想录算法训练营第六天|242.有效的字母异位词 、349. 两个数组的交集 、202. 快乐数、1. 两数之和

哈希表的表示方式&#xff1a;数组、set、map 数组&#xff1a;范围可控的情况下&#xff0c;可以用数组 set&#xff1a;哈希值较大的情况下&#xff0c;或数值分布很分散的情况 map&#xff1a;key 和 value对应的情况下 能用数组尽量用数组&#xff0c;因为数组会比较快&…

Netty内存管理

关键概念 PoolArena——内存管理的统筹者 PoolArena是内存管理的统筹者。它内部有一个PoolChunkList组成的链表 PoolChunkList——对PoolChunk的管理 PoolChunkList内部有一个PoolChunk组成的链表。通常一个PoolChunkList中的所有PoolChunk使用率(已分配内存/ChunkSize)都在…

机器学习算法分类

机器学习常用算法的分类&#xff1a; 根据数据集组成不同&#xff0c;可以把机器学习算法分为&#xff1a; 监督学习无监督学习半监督学习强化学习 1、监督学习 - 定义&#xff1a; - 输入数据是由输入特征值和目标值所组成 - 函数的输出可以是一个连续的值&#xff08;称为回…

【文本三剑客】AWK

AWK 一、AWK的工作原理1.1命令格式1.2awk常见的内建变量 二、awk实验2.1按行输入文本2.2按字段输出文本2.3通过管道符、双引号调用shell命令 一、AWK的工作原理 逐行读取文本&#xff0c;默认以空格或tab键为分隔符进行分隔&#xff0c;将分隔所得的各个字段保存到内建变量中&…

银行数字化转型导师坚鹏:银行数字化转型面临的5大机遇与4大挑战

在机遇方面&#xff0c;主要面临以下5大机遇&#xff1a; 国家战略及政策机遇&#xff1a;乡村振兴战略、制造强国战略、绿色金融战略等战略的落实将会给银行数字化转型带来新的业务机遇&#xff0c;《中国银保监会关于推动银行业和保险业高质量发展的指导意见》、《关于银行业…

第五章 面向对象-4abstract抽象

1.4 abstract class抽象类 声明抽象类&#xff0c;使用关键字abstract //内部匿名类 Db db new Db(){ };3.了解抽象类 抽象方法 AbstractClassMain.java /** Copyright (c) 2017, 2023, zxy.cn All rights reserved.**/ package cn.practice2;/*** <p>Description:&…

Chatgpt中文版无需代理,ChatGPT镜像

Chatgpt中文版无需代理 网站ChatGPT中文版 ChatGPT中文版是一个基于人工智能技术的聊天机器人&#xff0c;它可以模拟人类的自然语言交互&#xff0c;回答用户的各种问题和提供各种服务。它的核心技术是GPT&#xff08;Generative Pre-trained Transformer&#xff09;模型&am…

基础:Android相关基础知识

目录 1.Android 四大组件 2.Activity生命周期 3.Service的生命周期 4.Service的启动方式 5.Activity的启动模式 6.广播的分类 7.ANR是什么&#xff0c;怎么避免&#xff1f; 8.Handler消息处理机制 9.事件分发机制 10.View绘制流程 11.Binder机制 12.进程间通信 1…

2023最新一键开通主机免费源码

更新了ui 自助开通主机&#xff0c;自己修改服务器 不带接口&#xff0c;不带接口&#xff0c;不带接口 打开api.php文件&#xff0c;把8.8.8.8改服务器ip&#xff0c;123456改成你的密钥 前往我的技术博客查看更多https://202271.xyz/?zhuji 蓝奏云链接 https://wwp.lanz…

如何在Linux中显示网络连接、路由表、接口统计等信息?Netstat了解一下!

Netstat 是一个用于显示网络连接、路由表、接口统计等信息的命令行工具。它在 Linux 和其他类 Unix 系统中都有提供&#xff0c;可以帮助我们分析和诊断网络问题。本文将介绍 Netstat 命令的基本用法和常见选项。 Netstat 命令的语法 Netstat 命令的基本语法如下&#xff1a; …

rtl仿真器-ghdl安装和测试

安装 sudo add-apt-repository ppa:pgavin/ghdl sudo apt-get update sudo apt-get install ghdl gtkwave仿真 rtl add.v library ieee; use ieee.std_logic_1164.all; entity ADD is port (A,B:in bit; SUM,CARRY:out bit); end entity ADD; architecture behave of ADD i…

前端部署vue项目到腾讯云服务器

先把dist包上传服务器 可以使用宝塔、FileZilla、手动上传等等方式 已有腾讯云服务器之后进入面板界面 然后安装Nginx 请一步一步&#xff0c;紧跟步骤 第一步 安装gcc-c 编译器。nginx依赖的 pcre 和 zlib 包 yum -y install gcc zlib zlib-devel pcre-devel openssl openss…

嵌入式通信协议【Modbus】Modbus功能码的详细描述

一、读功能码 1、 01 (0x01)读线圈 在一个远程设备中&#xff0c;使用该功能码读取线圈的 1 至 2000 连续状态。请求 PDU 详细说明了起始地址&#xff0c;即指定的第一个线圈地址和线圈编号。从零开始寻址线圈。因此寻址线圈 1-16 为 0-15。 根据数据域的每个比特将响应报文…

vs 推送代码 之 gitee

我们常常想将自己的代码放入到代码管理工具中&#xff0c;接下来我们将讲解如何去将vs中的代码放入到代码管理工具中 目的&#xff1a;将vs中的项目代码放入到gitee中 首先&#xff1a; 我们需要注册一下gitee的账号&#xff0c;官网&#xff1a;gitee官网 辅助工具&#xff…

做网络那么多年,连以太网接口和串口都分不清?本文值得一看!

路由器是一种网络设备&#xff0c;它的主要功能是在不同的网络之间转发数据包&#xff0c;实现网络互联。路由器根据数据包的目的地址&#xff0c;选择最佳的路径&#xff0c;将数据包发送到下一跳。路由器可以连接不同的网络类型&#xff0c;如以太网、帧中继、PPP等。 路由器…

JavaWeb_Mysql_多表设计与查询

JavaWeb_Mysql_多表设计与查询 多表设计外键约束物理外键 -- 不推荐逻辑外键 多表关系实现 多表查询数据准备内连接外连接子查询标量子查询列子查询行子查询表子查询 案例数据准备案例需求 来源 多表设计 外键约束 物理外键 – 不推荐 概念: 使用foreign key定义外键关联另外…