记录几个Hudi Flink使用问题及解决方法

news2025/1/13 8:01:19

前言

如题,记录几个Hudi Flink使用问题,学习和使用Hudi Flink有一段时间,虽然目前用的还不够深入,但是目前也遇到了几个问题,现在将遇到的这几个问题以及解决方式记录一下

版本

  • Flink 1.15.4
  • Hudi 0.13.0

流写

流写Hudi,必须要开启Checkpoint,这个我在之前的文章:Flink SQL Checkpoint 学习总结提到过。

如果不设置Checkpoint,不会生成commit,感觉像是卡住一样,具体表现为只生成.commit.requested和.inflight,然后不写文件、不生成.commit也不报错,对于新手来说很费劲,很难找到解决方法。

索引

hudi-flink 仅支持两种索引:FLINK_STATEBUCKET,默认FLINK_STATE

最开始使用hudi是用的spark,hudi-spark支持BLOOM索引,hudi java client也支持BLOOM索引,所以认为hudi-flink也支持BLOOM索引,但其实不支持,而且官网并没有相关的文档说明,可以从下面这段代码中看出来

Pipelines.hoodieStreamWrite

public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
    // 如果是`BUCKET`索引
    if (OptionsResolver.isBucketIndexType(conf)) {
      WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
      int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
      String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
      BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
      return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
          .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
          .uid(opUID("bucket_write", conf))
          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    } else {
      // 否则按`FLINK_STATE`索引的逻辑
      WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
      return dataStream
          // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
          .keyBy(HoodieRecord::getRecordKey)
          .transform(
              "bucket_assigner",
              TypeInformation.of(HoodieRecord.class),
              new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
          .uid(opUID("bucket_assigner", conf))
          .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
          // shuffle by fileId(bucket id)
          .keyBy(record -> record.getCurrentLocation().getFileId())
          .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
          .uid(opUID("stream_write", conf))
          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    }
  }

FLINK_STATE 重复问题

如果使用默认的FLINK_STATE索引,在upsert时可能会有重复问题。(之前使用BLOOM索引时不会有这个问题)

问题复现

先写一部分数据作为历史数据到Hudi表,然后再写相同的数据到这个表,最后count表发现数据量变多,也就是有重复数据。
主要参数:

set parallelism.default=12;
set taskmanager.numberOfTaskSlots=2;

'write.operation'='upsert',
'write.tasks'='11', 
'table.type'='COPY_ON_WRITE', 

场景为kafka2hudi,kafka数据量200w,没有重复,设置并发的主要目的是为了将数据打散分布在不同的文件里,这样更容易复现问题。(因为如果只有一个历史文件时,很难复现)

第一次任务跑完表数据量为200w,第二次跑完表数据量大于200w,证明数据重复。

重复原因

index state:保存在state中的主键和文件ID的对应关系
重复的原因为FLINK_STATE将主键和文件ID的对应关系保存在state中,当新启动一个任务时,index state需要重新建立,而默认情况下不会包含历史文件的index state,只会建立新数据的index state,所以对于没有历史文件的新表是不会有重复问题的。(对于有历史文件的表,如果从checkpoint恢复也不会有重复问题,因为从checkpoint恢复时,也恢复了之前历史文件的index state

解决方法

通过参数index.bootstrap.enabled解决,默认为false,当为true时,写hudi任务启动时会先引导(加载)历史文件的index state

'index.bootstrap.enabled'='true'

除了重复问题,FLINK_STATE因为将index保存在state中,所以随着数据量的增加,state越来越大。这样对于数据量特别大的表,对内存的要求也会很高,所以会遇到内存不足OOM的问题。 所以建议对于大表,还是选择使用BUCKET索引。

增量数据,‘index.bootstrap.enabled’='false’时的checkpoint记录,checkpoint大小开始很小,然后逐渐增加

增量数据,‘index.bootstrap.enabled’='true’时的checkpoint记录,checkpoint大小开始和结束差不多大

BUCKET INDEX

BUCKET索引需要根据表数据量大小设定好桶数(hoodie.bucket.index.num.buckets),但是默认情况下不能动态调整bucket数量。

另外可以通过参数hoodie.index.bucket.engine将其值设为CONSISTENT_HASHING,通过一致性哈希实现动态调整bucket数量,但是仅支持MOR表,我还没有试过这个功能,大家可以通过官网:https://hudi.apache.org/docs/configurations/了解相关参数自行测试。

hoodie.index.bucket.engine | SIMPLE (Optional) | org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of bucketing or hashing to use when hoodie.index.type is set to BUCKET. SIMPLE(default): Uses a fixed number of buckets for file groups which cannot shrink or expand. This works for both COW and MOR tables. CONSISTENT_HASHING: Supports dynamic number of buckets with bucket resizing to properly size each bucket. This solves potential data skew problem where one bucket can be significantly larger than others in SIMPLE engine type. This only works with MOR tables.

Config Param: BUCKET_INDEX_ENGINE_TYPE
Since Version: 0.11.0

BUCKET索引主要参数:

'index.type' =  'BUCKET', -- flink只支持两种index,默认FLINK_STATE index,FLINK_STATE index对于数据量比较大的情况会因为tm内存不足导致GC OOM
'hoodie.bucket.index.num.buckets' = '16', -- 桶数

注意,index.type是flink客户端独有的,和公共的不一样(使用公共参数不生效),没有前缀hoodie.,而桶数配置项是hudi公共参数,对于flink客户端哪些用公共参数哪些用flink独有的参数,官方文档并没有提供,需要自己在类org.apache.hudi.configuration.FlinkOptions查看,该类中的参数为flink重写的独有参数,没有的话则需要使用公共参数

insert转upsert问题

对于BUCKET如果先insert一部分历史数据,再upsert增量数据时,默认参数配置会抛出如下异常:
(复现此问题只需要批写一条数据即可)

Caused by: java.lang.NumberFormatException: For input string: "4ff32a41"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:162)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:160)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:112)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)

原因是:默认参数下,insert时没有按照bucket索引的逻辑写文件,而upsert是按照bucket逻辑写文件的,bucket索引写的文件名前缀都带有桶号,不是bucket索引写的文件名没有桶号,所以upsert时会尝试解析insert写的历史文件的桶号,导致解析失败。

bucket索引逻辑写的文件

/tmp/cdc/hudi_sink_insert/4ff32a41-4232-4f47-855a-6364eb1d6ce8-0_0-1-0_20230820210751280.parquet

bucket索引逻辑写的文件

/tmp/cdc/hudi_sink_insert/00000000-82f4-48a5-85e9-2c4bb9679360_0-1-0_20230820211542006.parquet

解决方法

对于实际应用场景是有这种先insert在upsert的需求的,解决方法就是尝试通过配置参数使insert也按照bucket索引的逻辑写数据
主要参数:'write.insert.cluster'='true'
相关参数:

'write.operation'='insert', 
'table.type'='COPY_ON_WRITE',
'write.insert.cluster'='true',
'index.type' =  'BUCKET',

我是通过阅读源码发现这个参数可以使insert按照bucket逻辑写数据的
对应的源码在HoodieTableSink.getSinkRuntimeProvider,我在上篇文章Hudi Flink SQL源码调试学习(一)中分析了写hudi时是如何调用到这个方法的,感兴趣得可以看一下。

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return (DataStreamSinkProviderAdapter) dataStream -> {

      // setup configuration
      long ckpTimeout = dataStream.getExecutionEnvironment()
          .getCheckpointConfig().getCheckpointTimeout();
      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
      // set up default parallelism
      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());

      RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

      // bulk_insert mode
      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
      if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
        return Pipelines.bulkInsert(conf, rowType, dataStream);
      }

      // Append mode
      if (OptionsResolver.isAppendMode(conf)) {
        DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
        if (OptionsResolver.needsAsyncClustering(conf)) {
          return Pipelines.cluster(conf, rowType, pipeline);
        } else {
          return Pipelines.dummySink(pipeline);
        }
      }

      DataStream<Object> pipeline;
      // bootstrap
      final DataStream<HoodieRecord> hoodieRecordDataStream =
          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
      // write pipeline
      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
      // compaction
      if (OptionsResolver.needsAsyncCompaction(conf)) {
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        return Pipelines.compact(conf, pipeline);
      } else {
        return Pipelines.clean(conf, pipeline);
      }
    };
  }

我们在上面的代码中可以发现,当是append模式时会走单独的写逻辑,不是append模式时,才会走下面的Pipelines.hoodieStreamWrite,那么就需要看一下append模式的判断逻辑

OptionsResolver.isAppendMode(conf)

  public static boolean isAppendMode(Configuration conf) {
    // 1. inline clustering is supported for COW table;
    // 2. async clustering is supported for both COW and MOR table
    return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER)
        || needsScheduleClustering(conf);
  }

对于cow表insert时,默认参数的情况needsScheduleClustering(conf)返回false,而!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)返回true,所以只需要让!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)返回false就可以跳过append模式的逻辑了,也就是上面的 'write.insert.cluster'='true'。(每个版本的源码不太一样,所以对于其他版本,可能这个参数并不能解决该问题)

Hive查询异常

记录一个Hive SQL查询Hudi表的异常

异常信息

Caused by: java.lang.ClassCastException: org.apache.hadoop.io.ArrayWritable cannot be cast to org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
    at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.deliverVectorizedRowBatch(VectorMapOperator.java:803)
    at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(VectorMapOperator.java:845)
    ... 20 more


java.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritable

java.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritable    

异常复现

找一个hudi mor表的rt表,执行count语句(有人反馈聚合函数也会出现此异常)

解决方法

set hive.vectorized.execution.enabled=false; (我验证的这一个参数就可以了)
set hive.vectorized.execution.reduce.enabled=false;(不确定此参数是否必须)

相关阅读

  • Flink SQL Checkpoint 学习总结
  • Hudi Flink SQL源码调试学习(一)
  • Flink SQL操作Hudi并同步Hive使用总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/908468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一百六十三、Kettle——Linux上安装Kettle9.2(亲测有效,附截图)

一、目的 由于之前发现kettle8.2和kettle9.3这两个版本&#xff0c;或多或少的存在问题 比如kettle8.2的本地服务没问题&#xff0c;但在Linux上创建共享资源库时就有问题&#xff1b; 比如kettle9.3由于不自带shims驱动包&#xff0c;目前在新的下载官网上无法找到下载路径…

Gitbook超详细使用教程,搭建属于你自己的博客!

文章目录 简介与github同步1.创建space2.安装github插件3.同步github4.生成space的url 博客搭建指南1.自定义域名2.发表博客内容3.设置域名默认页面4.界面设置注意事项 End 简介 Gitbook 是一个平台&#xff0c;允许用户创建和分享内容丰富的在线书籍。它有一个用户友好的界面…

JDK JRE JVM 三者之间的详解

JDK : Java Development Kit JRE: Java Runtime Environment JVM : JAVA Virtual Machine JDK : Java Development Kit JDK : Java Development Kit【 Java开发者工具】&#xff0c;可以从上图可以看出&#xff0c;JDK包含JRE&#xff1b;java自己的一些开发工具中&#…

SpringBootWeb案例 Part 2

3. 员工管理 完成了部门管理的功能开发之后&#xff0c;我们进入到下一环节员工管理功能的开发。 基于以上原型&#xff0c;我们可以把员工管理功能分为&#xff1a; 分页查询 带条件的分页查询 删除员工 新增员工 修改员工 那下面我们就先从分页查询功能开始学习。 3.…

内存分布(以及new,delete)

今天给大家说下内存分布&#xff0c;我们都知道的是&#xff0c;像局部变量都在栈区&#xff0c;但是像我们自己有时候申请的空间都在堆区&#xff0c;当然&#xff0c;内存分布不只只是栈区和堆区&#xff0c;还有常量区&#xff0c;代码区等等。如下图&#xff1a; 这就是内存…

项目实战笔记4:敏捷

术语介绍 敏捷项目管理是一种以快速响应变化为核心的项目管理方法。与传统的瀑布模型不同&#xff0c;敏捷方法强调迭代开发和紧密的团队合作。其目的是尽可能快地交付可用的产品&#xff0c;然后在客户和团队之间进行反馈和迭代&#xff0c;以不断优化产品和开发过程。 在敏捷…

电商转化率是什么意思,怎么计算和提高电商转化率?

电商转化率是指访问电商网站的用户中&#xff0c;实际完成购买行为的比例。它可以衡量电商网站的销售能力和用户转化效果&#xff0c;是衡量电商运营效果的重要指标之一。 一、电商转化率的计算公式 电商转化率的计算公式为&#xff1a;转化率完成购买的用户数/访问网站的用户…

PyTorch DataLoader 报错 “DataLoader worker exited unexpectedly“ 的解决方案

注意&#xff1a;博主没有重写d2l的源代码文件&#xff0c;而是创建了一个新的python文件&#xff0c;并重写了该方法。 一、代码运行日志 C:\Users\Administrator\anaconda3\envs\limu\python.exe G:/PyCharmProjects/limu-d2l/ch03/softmax_regression.py Traceback (most r…

Python Opencv实践 - 图像中值滤波

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape) pixel_count img.shape[0] * img.shape[1] print(pixel_count)#为图像添加椒盐噪声 #参考资料&#xf…

Java后端开发面试题——框架篇

Spring框架中的bean是单例的吗&#xff1f;Spring框架中的单例bean是线程安全的吗&#xff1f; singleton : bean在每个Spring IOC容器中只有一个实例。 prototype&#xff1a;一个bean的定义可以有多个实例。 Spring bean并没有可变的状态(比如Service类和DAO类)&#xff0c…

Masterstudy主题 - 用于线上教育、在线学习和在线课程的LMS WordPress主题

Masterstudy主题是每个人的最佳选择&#xff01;它是一个完整的线上教育WordPress主题&#xff0c;适合所有想要创建在线课程、辅导和语言中心、在线学习平台并在全球范围内传播知识的人。这是一个完美的教育主题&#xff0c;旨在满足学习行业的需求。 网址&#xff1a;Master…

Python功能制作之简单的音乐播放器

需要导入的库&#xff1a; pip install PyQt5 源码&#xff1a; import os from PyQt5.QtCore import Qt, QUrl from PyQt5.QtGui import QIcon, QPixmap from PyQt5.QtMultimedia import QMediaPlayer, QMediaContent from PyQt5.QtWidgets import QApplication, QMainWind…

GPT-3.5——从 人工智障 到 大人工智障

有人说&#xff0c;GPT是从人工智障到人工智能的蜕变&#xff0c;但是。。。 我认为&#xff0c;GPT是从 人工智障 到 大人工智障 的退化。。。 从 人工智障 到 大人工智障 GPT-3.5学术介绍No.1---- 西红柿炒钢丝球基本信息详细制作方法材料步骤 幕后花絮 No.2---- 顶尖数学家…

MySQL高级篇——MySQL架构篇1(Linux下MySQL8的安装与使用)

目录 0 安装前0.1 Linux系统及工具的准备0.2 查看是否安装过MySQL0.3 MySQL的卸载 1 MySQL8的Linux版安装1.1 MySQL的4大版本1.2 下载MySQL指定版本1.3 CentOS7下检查MySQL依赖1.4 CentOS7下MySQL安装过程 2 MySQL登录2.1 首次登录2.2 修改密码2.3 设置远程登录 3 MySQL 8 的密…

Jmeter —— 自动录制脚本

目录 目录 1、Jmeter配置 1.1新增一个线程组 1.2Jmeter中添加HTTP代理 1.3配置HTTP代理服务器 2、录制脚本 2.1配置本地代理 2.2访问页面进行操作 2.3脚本处理 1、Jmeter配置 1.1新增一个线程组 1.2Jmeter中添加HTTP代理 1.3配置HTTP代理服务器 修改端口 修改Target…

嵌入式基础知识-中断处理过程

本篇来介绍中断&#xff0c;这是计算机系统以及嵌入式系统的重要概念。 1 中断基本概念 中断是CPU对系统发生的某个事件作出的一种反应。 中断的一些基本概念&#xff1a; 中断源&#xff1a;引起中断的事件称为中断源中断请求&#xff1a;中断源向CPU提出处理的请求称为中断…

excel文本函数篇1

本期主要介绍LEFT、RIGHT、MID以及后面加B的情况&#xff1a; &#xff08;1&#xff09;后缀没有B&#xff1a;一个字节代表一个中文字符 &#xff08;2&#xff09;后缀有B&#xff1a;两个字节代表一个中文字符 &#xff08;3&#xff09;LEFT()从前面开始找&#xff0c;RI…

【腾讯云 TDSQL-C Serverless 产品体验】基于腾讯云轻量服务器以及 TDSQL-C 搭建 LNMP WordPress 博客系统

文章目录 一、前言二、数据库发展与云原生数据库2.1 数据库发展简介2.2 云原生数据库简介2.2.1 云数据库与云原生数据库区别 三、腾讯云 TDSQL-C 数据库3.1 什么是腾讯云 TDSQL-C 数据库3.2 为什么推出 TDSQL-C 数据库&#xff1f;传统 MySQL 架构存在较多痛点3.2.1 传统 MySQL…

【Linux】cpolar+JuiceSSH实现手机端远程连接Linux服务器

文章目录 1. Linux安装cpolar2. 创建公网SSH连接地址3. JuiceSSH公网远程连接4. 固定连接SSH公网地址5. SSH固定地址连接测试 处于内网的虚拟机如何被外网访问呢?如何手机就能访问虚拟机呢? cpolarJuiceSSH 实现手机端远程连接Linux虚拟机(内网穿透,手机端连接Linux虚拟机) …

Amelia预订插件:WordPress企业级预约系统

并非所有WordPress预订插件都像他们所设计的那样。其中一些缺乏运行高效预约操作所需的功能&#xff0c;而其他一些则看起来陈旧过时。您不需要其中任何一个&#xff0c;但Amelia预订插件似乎希望确保所有用户都对功能和风格感到满意。 在这篇Amelia企业级预约系统插件评测中&…