Hudi集成Flink-写入方式

news2025/1/12 1:10:11

文章目录

  • 一、CDC 入湖
    • 1.1、[开启binlog](https://blog.csdn.net/wuxintdrh/article/details/130142601)
    • 1.2、创建测试表
      • 1.2.1、创建mysql表
      • 1.2.2、将 binlog 日志 写入 kafka
        • 1、使用 mysql-cdc 监听 binlog
        • 2、kafka 作为 sink表
        • 3、写入sink 表
      • 1.2.3、将 kakfa 数据写入hudi
        • 1、kafak 作为 源表,flinksql 消费kafka
  • 二、Bulk Insert (离线批量导入)
    • 2.1、buck_insert 案例
      • 2.2.1、mysql jdbc
      • 2.2.2、hudi buck_insert
      • 2.2.3、buck insert 写入hudi 表
  • 三、Index Bootstrap (全量接增量)
    • 3.1、Index Bootstrap 案例
  • 四、Changelog Mode
    • 4.1、基本特性
    • 4.2、可选配置参数
    • 4.3、案例
  • 五、Append Mode
    • 5.1、Inline Clustering (只支持 Copy_On_Write 表)
    • 5.2、Async Clustering
    • 5.3、Clustering Plan Strategy
  • 六、Bucket Index
    • 6.1、WITH 参数
    • 6.2、与 state index 对比
    • 七、Rate Limit (限流)

使用版本

hudi-0.12.1
flink-1.15.2

一、CDC 入湖

CDC(change data capture) 保证了完整数据变更,目前主要有两种方式
在这里插入图片描述

  • 1、直接使用 cdc-connector 对接 DBbinlog数据导入。优点是不依赖消息队列,缺点是对 db server 造成压力
  • 2、对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。

接下来我们主要介绍 第二种方式

1.1、开启binlog

1.2、创建测试表

1.2.1、创建mysql表

create database hudi_test;
use hudi_test;

-- 建表
create table person(
	id int  auto_increment primary key,
	name varchar(30),
	age int
);

1.2.2、将 binlog 日志 写入 kafka

mysql-cdc 参考: https://chbxw.blog.csdn.net/article/details/119841434
使用cdc-2.x

1、使用 mysql-cdc 监听 binlog

wget https://maven.aliyun.com/repository/central/com/ververica/flink-connector-mysql-cdc/2.0.0/flink-connector-mysql-cdc-2.0.0.jar

Flink SQL> 
create database hudi_test;
use hudi_test;

create table person_binlog (
  id bigint not null,
  name string,
  age int,
  primary key (id) not enforced
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'chb1',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'flinktest',
  'table-name' = 'person'
);

使用mysql-cdc 报错

NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/Thr

原因在于sql和非sql connector实现中对于shaded guava的处理不同,
使用 flink-sql-connector-mysql-cdc 替代 flink-connector-mysql-cdc 而且2.0.0版本不行,提升到2.2.1版本解决问题。

2、kafka 作为 sink表

-- 为了显示更清晰
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.

Flink SQL> SET 'execution.runtime-mode' = 'streaming';      
[INFO] Session property has been set.

Flink SQL> 
create table person_binlog_sink_kafka(
  id bigint not null,
  name string,
  age int not null,
  primary key (id) not enforced -- 主键
) with (
  'connector' = 'upsert-kafka' -- kafka connector upsert-kafka
  ,'topic' = 'cdc_mysql_person_sink'
  ,'properties.zookeeper.connect' = 'chb1:2181'
  ,'properties.bootstrap.servers' = 'chb1:9092'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
);

在这里插入图片描述

3、写入sink 表

Flink SQL> 
insert into person_binlog_sink_kafka
select * from person_binlog;

1.2.3、将 kakfa 数据写入hudi

1、kafak 作为 源表,flinksql 消费kafka

Flink SQL> 
create table person_binlog_source_kafka (
  id bigint not null,
  name string,
  age int not null
) with (
  'connector' = 'kafka'
  ,'topic' = 'cdc_mysql_person_sink'
  ,'properties.bootstrap.servers' = 'chb1:9092'
  ,'format' = 'json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'testGroup'
);

2、创建hudi目标表

Flink SQL> 
create table person_binlog_sink_hudi (
  id bigint not null,
  name string,
  age int not null,
  primary key (id) not enforced -- 主键
) with (
  'connector' = 'hudi',
  'path' = 'hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert'
);

3、将 kafka 中数据 写入 hudi

Flink SQL> 
insert into person_binlog_sink_hudi 
select * from person_binlog_source_kafka;

插入20条数据,产生332个小文件, 小文件问题
在这里插入图片描述

二、Bulk Insert (离线批量导入)

如果数据源来源于其他系统,可以使用批量导入数据功能,快速的将存量数据导入hudi。

  • 1、消除了序列化和数据合并。由于跳过了重复数据删除,用户需要保证数据的唯一性。
  • 2、在批处理执行模式下效率更高。默认情况下,批处理执行模式将输入记录按分区路径进行排序,并写入Hudi,避免频繁切换文件句柄导致写性能下降。
Flink SQL> 
SET 'execution.runtime-mode' = 'streaming';  // 默认是流模式
SET 'execution.checkpointing.interval' = '0';  // 关闭checkpoint, batch模式不支持checkpoint
  • 3、bulk_insert 的并行度由write.tasks指定。并行度会影响小文件的数量。理论上,bulk_insert的并行度是桶的数量(特别是,当每个桶写入到最大文件大小时,它将切换到新的文件句柄。最后,文件的数量 >= write.bucket_assign.tasks
参数名是否必选默认值备注
write.operationtrueupsert设置为bulk_insert 开启功能
write.tasksfalse4bulk_insert 的并行度, 文件数量 >= write.bucket_assign.tasks
write.bulk_insert.shuffle_inputfalsetrue写入前是否根据输入字段(分区) shuffle。启用此选项将减少小文件的数量,但可能存在数据倾斜的风险
write.bulk_insert.sort_inputfalsetrue写入前是否根据输入字段(partition字段)对数据进行排序。当一个 write task写多个分区时,启用该选项将减少小文件的数量
write.sort.memoryfalse128排序算子 可用的 managed memory 默认128 MB 在这里插入图片描述

2.1、buck_insert 案例

2.2.1、mysql jdbc

参考: https://chbxw.blog.csdn.net/article/details/119479967

Flink SQL> 
create table person (
  id int not null,
  name string,
  age int not null,
  primary key (id) not enforced
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://chb1:3306/flinktest',
  'username' = 'root',
  'password' = '123456',
  'table-name' = 'person'
);

报错 java.lang.Integer cannot be cast to java.lang.Long, 由于 mysql 中 person的id 是 int 类型, 转为 flink 对应的是 int, 但是在flink建表时 字段为 bigint.所以报错。
在这里插入图片描述

2.2.2、hudi buck_insert

Flink SQL> 
create table person_binlog_sink_hudi_buck (
  id int not null,
  name string,
  age int not null,
  primary key (id) not enforced -- 主键
) with (
  'connector' = 'hudi',
  'path' = 'hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi_buck',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert'  -- 配置 buck_insert 模式 
);

2.2.3、buck insert 写入hudi 表

Flink SQL> 
insert into person_binlog_sink_hudi_buck  
select * from person;

一次性的。
在这里插入图片描述

三、Index Bootstrap (全量接增量)

在上面使用 buck_insert 已经完成全量数据导入,接下来, 用户可以通过Index Bootstrap 功能实时插入增量数据,保证数据不重复。

WITH 参数

参数名是否必选默认值备注
index.bootstrap.enabledtruefalse此功能开启,Hudi 表中剩余的记录将一次性加载到Flink状态
index.partition.regexfalse*优化选择。设置正则表达式以过滤分区。默认情况下,所有分区都加载到flink状态

使用方法

  • CREATE TABLE创建一条与Hudi表对应的语句。 注意这个 table.type 配置必须正确。
  • 设置index.bootstrap.enabled = true来启用index bootstrap功能
  • flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项execution.checkpointing.tolerable-failed-checkpoints = n(取决于Flink checkpoint执行时间)
  • 等待直到第一个checkpoint成功,表明index bootstrap完成。
  • index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint`)。
  • 重启任务,并且设置index.bootstrap.enablefalse

注意:

  • 索引引导是一个阻塞过程,因此在索引引导期间无法完成checkpoint。
  • index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。
  • index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition以及Load record form file观察index bootstrap的进度。
  • 第一个成功的checkpoint表明 index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引。

3.1、Index Bootstrap 案例

Flink SQL> 
create table person_binlog_sink_hudi_boot (
  id bigint not null,
  name string,
  age int not null,
  primary key (id) not enforced -- 主键
) with (
  'connector' = 'hudi',
  'path' = 'hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi_buck',
  'table.type' = 'MERGE_ON_READ',
  'index.bootstrap.enabled'='true'
);

index bootstrap表接cdc表

Flink SQL> 
insert into person_binlog_sink_hudi_boot 
select * from person_binlog;

四、Changelog Mode

4.1、基本特性

Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息,支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。

4.2、可选配置参数

参数名是否必选默认值备注
changelog.enabledfalsefalse默认是关闭的,即upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。
设置为true以支持消费所有的更改

注意

  • 不管格式是否存储了中间更改日志消息,批(快照)读取仍然合并所有中间更改。

  • 在设置changelog.enabletrue时,中间的变更也是 best effort: 异步压缩任务将更新日志记录合并到一条记录中,因此如果流源不及时消费,则压缩后只能读取每个key的合并记录。

    • 解决方案是通过调整压缩策略,比如压缩选项:compress.delta_commitscompression.delta_seconds,为读取器保留一些缓冲时间。

4.3、案例

在这里插入图片描述

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; -- table tableau  changelog
[INFO] Session property has been set.

Flink SQL> SET 'execution.runtime-mode' = 'streaming';      
[INFO] Session property has been set.

Flink SQL> 
create table person2(
  id bigint not null,
  name string,
  age int not null,
  primary key (id) not enforced -- 主键
) with (
  'connector' = 'hudi',
  'path' = 'hdfs://chb3:8020/hudi_db/person2',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'changelog.enabled' = 'true'
);


-- 插入数据
insert into person2 values (1, 'chb', 23);
insert into person2 values (1, 'chb', 24);

select * from person2;

在这里插入图片描述
创建非changelog表, url 指向person2同一路径


Flink SQL> 
create table person3(
  id bigint not null,
  name string,
  age int not null,
  primary key (id) not enforced -- 主键
) with (
  'connector' = 'hudi',
  'path' = 'hdfs://chb3:8020/hudi_db/person2',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'
);

结果只有最新数据

在这里插入图片描述

报错 Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat

拷贝 hadoop-mapreduce-client-core.jar 到 flink lib.

五、Append Mode

从 0.10 开始支持

对于 INSERT 模式:

  • MOR 默认会 apply 小文件策略: 会追加写 avro log 文件
  • COW 每次直接写新的 parquet 文件,没有小文件策略

Hudi 支持丰富的 Clustering 策略,优化 INSERT 模式下的小文件问题。

5.1、Inline Clustering (只支持 Copy_On_Write 表)

参数名是否必选默认值备注
write.insert.clusterfalsefalse是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响 (用的比较少,建议使用 Async Clustering)

5.2、Async Clustering

​ 从 0.12 开始支持

WITH 参数

名称Required默认值说明
clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan,默认关闭
clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commits,clustering.schedule.enabled 为 true 时生效
clustering.async.enabledfalsefalse是否异步执行 clustering plan,默认关闭
clustering.tasksfalse4Clustering task 执行并发
clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小,默认 1GB
clustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering,默认600MB
clustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段
clustering.plan.partition.filter.modefalseNONE支持NONE:不做限制RECENT_DAYS:按时间(天)回溯SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天

5.3、Clustering Plan Strategy

​ 支持定制化的 clustering 策略。

名称Required默认值说明
clustering.plan.partition.filter.modeFALSENONE支持·
NONE:不做限制·
RECENT_DAYS:按时间(天)回溯·
SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsFALSE2RECENT_DAYS 生效,默认 2 天
clustering.plan.strategy.cluster.begin.partitionFALSEN/ASELECTED_PARTITIONS 生效,指定开始 partition(inclusive)
clustering.plan.strategy.cluster.end.partitionFALSEN/ASELECTED_PARTITIONS 生效,指定结束 partition(incluseve)
clustering.plan.strategy.partition.regex.patternFALSEN/A正则表达式过滤 partitions
clustering.plan.strategy.partition.selectedFALSEN/A显示指定目标 partitions,支持逗号 , 分割多个 partition

六、Bucket Index

默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket 索引通过固定的 hash 策略,将相同 key 的数据分配到同一个 fileGroup 中,避免了索引的存储和查询开销。

6.1、WITH 参数

名称Required默认值说明
index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能
hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集
hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数,当前设置后则不可再变更

6.2、与 state index 对比

  • (1)bucket index 没有 state 的存储计算开销,性能较好
  • (2)bucket index 无法扩容 buckets,state index 则可以依据文件的大小动态扩容
  • (3)bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制),state index 没有限制

七、Rate Limit (限流)

有许多用户将完整的历史数据集与实时增量数据一起放到消息队列中的用例。然后使用 flink 将队列中的数据从最早的偏移量消费到hudi中。
消费历史数据集具有以下特点:

  • 1)瞬时吞吐量巨大
  • 2)严重无序(随机写分区)。

这将导致写入性能下降和吞吐量故障。对于这种情况,可以打开速度限制参数以确保流的平滑写入。

名称Required默认值说明
write.rate.limitfalse0默认禁止限流

参考:
https://hudi.apache.org/cn/docs/hoodie_deltastreamer/#flink-ingestion

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/423954.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ERTEC200P-2 PROFINET设备完全开发手册(4-2)

4.2 XHIF接口实验 4.2.1写入单片机固件 首先按照下图连接设备 用JLINK 20Pin JTAG连接4 Pin SWD可以采用转接板 单片机的参考程序是用ST的CubeIDE生成的,目前的版本是1.7.0。打开安装后的CubeIDE,在菜单中选择“File->Import“ 选择“Existing Proj…

企业信息化建设都包括哪些方面?

随着大数据技术的发展,时代的发展要求企业转变管理模式、建立信息化管理机制,同时也是提高工作、管理效率,促进企业战略性发展的重要保障。 企业信息化是将信息技术应用于企业发展实践中的一个动态过程,即通过挖掘先进的管理理念…

高可靠多层板制造服务再获认可!华秋荣获创想三维优秀质量奖

4月10日,创想三维2023年度战略供应商大会在惠州成功举办,高可靠多层板制造商华秋出席了本次活动并取得了《优秀质量奖》一奖项。 大会现场,创想三维董事长陈春指出公司的持续发展与供应链高质量的交付息息相关。作为创想三维主力PCB供应商&am…

【亲测有效】更新了WIN11之后 右键无 新建WORD,PPT,EXCEL 选项 问题 解决方案

原本正常的正版系统,在昨天4月自动更新安装之后,发现右键找 不到新建文档了,word,ppt,excel都不见了。 看了网上大神的方法 Win11安装了Office右键没有新建Excel选项怎么办? - 知乎 可以解决一部分 官方解决方案,亲…

李宏毅2021春季机器学习课程视频笔记14-Transformer

Transformer Transformer实际上就是变形金刚,其与Bert实际类似。其实际上就是一个Sequence-to-Sequence的模型,其输出的长度并不是由人为指定,而是由机器自行确定。 Transformer的基本结构,如上图所示,主要由一个Encod…

AS01/AS02/AS03 创建定制屏幕字段

本文简介:在sap标准屏幕上,增加客户定制的屏幕字段。 操作步骤: 1、在创建资产卡片AS01时,界面需要输入客户定制的字段,如下图方框所示 2、查看增强点,事务码:SMOD AIST002 3、创建增强项目…

M1 M2上能安装上Autocad 2024 Mac 中文版吗 autocad m1 m2版本有啦 终于支持Ventura 13x了

AutoCAD是一款强大的工具,适合于各种领域的设计和绘图。它具有二维图形和三维建模功能、多种文件格式支持、自定义命令和样式、批处理和脚本等特点,可以帮助用户实现高质量的设计和建模。同时,还支持云端存储和共享,方便用户随时随…

【Linux】-- 进程概念的引入

目录 硬件 冯诺依曼体系结构 冯诺依曼体系结构推导 重点概念 网络数据流向 软件 操作系统(Operator System - OS) 概念 定位 进程内核数据结构PCB(task_struct) 通过系统调用创建进程-fork初始 fork基本用法 使用if进行分流 查看运行效果 …

Python每日一练(20230417)

目录 1. 最大间距 🌟🌟🌟 2. Z 字形变换 🌟🌟 3. 买卖股票的最佳时机 II 🌟🌟 🌟 每日一练刷题专栏 🌟 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练…

通过Android Studio自制.9.png启动页图片 - uniapp启动界面设置Android启动图片设置

效果图 实现步骤 下载安装JDK 参考Oracle官网: https://www.oracle.com/java/technologies/javase/upgrade.html 再跳转到JavaSE Upgrade下载页面:Java Downloads | Oracle 配置JDK: 假设jdk安装位置是D:\Program Files\Java\jdk-18.0.1.1 …

CDH6.3.2大数据集群生产环境安装(八)之各组件参数调优,yarn参数调优,hdfs参数调优等

yarn资源调优 主要涉及到了ResourceManager、NodeManager这几个概念,相关的优化也要紧紧围绕着这几方面来开展。这里还有一个Container的概念,现在可以先把它理解为运行map/reduce task的容器 28.1. 内存 堆栈等配置  原值  调优值

java并发编程之美第一章并发编程基础(读书笔记)

1–50面 java并发编程基础 什么是线程 进程: 是代码在数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位 线程: 是进程的一次执行路径,一个进程至少有一个线程,进程中的多个线程共享进程的资源. 线程是CPU分配的基本单位 栈: 每个线程都有自己的栈资源,用…

02_CCC3.0数字钥匙_SPAKE2+执行流程

02_CCC3.0数字钥匙_SPAKE2执行流程Vehicle OEM Server:派生salt、L和w0;这三个参数需要服务器给到车辆端的,所以需要在服务器事先生成。用于与车辆端的做SPAKE2验证。DK Scrypt(pwd, s, Nscrypt, r, p, dkLen); z0 DK[0 : 320]…

判断环形链表是否有环??返回环形链表的入口点

给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置(…

IDEA插件-MavenHapler

1.安装Maven Helper Maven Helper 是 IntelliJ IDEA 中的一个插件,可以帮助您管理 Maven 依赖项。它可以帮助您更容易地删除不再需要的依赖项,查看依赖项的冲突,以及执行其他有关 Maven 依赖项的操作。 打开 IDEA 设置页面: 在插…

gpu超频超额训练导致电源关机

详细原理参见: 离显卡功耗实标还有多远?峰值功耗与电源关系终结篇 – FCPOWERUP极电魔方 和 【硬件科普】如何合理科学的选择电源功率的大小?_哔哩哔哩_bilibili 本人的1250w电源截图: 分析: 12V输出分了6路&#xff…

游戏逆向_Android读写游戏内容

一、背景 Android外挂的实现,需要涉及相应游戏内容的读写。读写的游戏内容包括代码和数据 针对不同的读写对象,通用的步骤就是寻找对象地址(位置)→获取相应权限→读写。下面将更详细介绍下相关实现。 二、实现方式 实现方式可…

了解最新的Android开发趋势和技术的秘诀

前言 当前,Android开发市场已经相当成熟,并且在全球范围内都非常活跃。Android是全球最受欢迎的移动操作系统之一,自Android开源以来,它已经改变了移动技术。市场上大量的企业和开发者都在积极地跟进、深入研究和开发Android系统…

大数据Flink进阶(十二):Flink本地模式开启WebUI

文章目录 Flink本地模式开启WebUI 一、在Flink 项目中添加本地模式 WebUI的依赖

2023 Java面试题短期突击攻略,已帮助400+位程序员成功拿到offer

2023春招已经开始一段时间了,很多同学会问Java面试八股文有必要背吗? 我的回答是:很有必要。你可以讨厌这种模式,但你一定要去背,因为不背你就进不了大厂。 国内的互联网面试,恐怕是现存的、最接近科举考试…