目录
0. 相关文章链接
1. 去重参数
2. 并发参数
2.1. 参数说明
2.2. 案例演示
3. 压缩参数
3.1. 参数说明
3.2. 案例演示
4. 文件大小
4.1. 参数说明
4.2. 案例演示
5. Hadoop 参数
Flink可配参数官网地址:All Configurations | Apache Hudi
0. 相关文章链接
Hudi文章汇总
1. 去重参数
通过如下语法设置主键:
-- 设置单个主键
create table hoodie_table (
f0 int primary key not enforced,
f1 varchar(20),
...
) with (
'connector' = 'hudi',
...
)
-- 设置联合主键
create table hoodie_table (
f0 int,
f1 varchar(20),
...
primary key(f0, f1) not enforced
) with (
'connector' = 'hudi',
...
)
名称 | 说明 | 默认值 | 备注 |
hoodie.datasource.write.recordkey.field | 主键字段 | -- | 支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段 |
precombine.field (0.13.0 之前版本为 write.precombine.field) | 去重时间字段 | -- | record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record |
2. 并发参数
2.1. 参数说明
名称 | 说明 | 默认值 | 备注 |
write.tasks | writer 的并发,每个 writer 顺序写 1~N 个 buckets | 4 | 增加并发对小文件个数没影响 |
write.bucket_assign.tasks | bucket assigner 的并发 | Flink的并行度 | 增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket) 数 |
write.index_bootstrap.tasks | Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数 | Flink的并行度 | 只在 index.bootstrap.enabled 为 true 时生效 |
read.tasks | 读算子的并发(batch 和 stream) | 4 | |
compaction.tasks | online compaction 算子的并发 | writer 的并发 | online compaction 比较耗费资源,建议走 offline compaction |
2.2. 案例演示
可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */
insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */
select * from sourceT;
3. 压缩参数
3.1. 参数说明
在线压缩的参数,通过设置 compaction.async.enabled =false关闭在线压缩执行,但是调度compaction.schedule.enabled 仍然建议开启,之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan。
名称 | 说明 | 默认值 | 备注 |
compaction.schedule.enabled | 是否阶段性生成压缩 plan | true | 建议开启,即使compaction.async.enabled 关闭的情况下 |
compaction.async.enabled | 是否开启异步压缩 | true | 通过关闭此参数关闭在线压缩 |
compaction.tasks | 压缩 task 并发 | 4 | |
compaction.trigger.strategy | 压缩策略 | num_commits | 支持四种策略:num_commits、time_elapsed、num_and_time、 num_or_time |
compaction.delta_commits | 默认策略,5 个 commits 压缩一次 | 5 | |
compaction.delta_seconds | 3600 | ||
compaction.max_memory | 压缩去重的 hash map 可用内存 | 100(MB) | 资源够用的话建议调整到 1GB |
compaction.target_io | 每个压缩 plan 的 IO 上限,默认 5GB | 500(GB) |
3.2. 案例演示
CREATE TABLE t3(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t3',
'compaction.async.enabled' = 'true',
'compaction.tasks' = '1',
'compaction.schedule.enabled' = 'true',
'compaction.trigger.strategy' = 'num_commits',
'compaction.delta_commits' = '2',
'table.type' = 'MERGE_ON_READ'
);set table.dynamic-table-options.enabled=true;
insert into t3
select * from sourceT/*+ OPTIONS('rows-per-second' = '5')*/;
注意:如果没有按照 Hudi(12):Hudi集成Flink之sql-client方式 中的 1.3章节 yarn-session模式解决hadoop依赖冲突问题,那么无法compaction生成parquet文件,报错很隐晦,在Exception中看不到,要搜索TaskManager中关于compaction才能看到报错。
4. 文件大小
4.1. 参数说明
Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。目前只有 log 文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。
名称 | 说明 | 默认值 | 备注 |
hoodie.parquet.max.file.size | 最大可写入的 parquet 文件大小 | 120 * 1024 * 1024 默认 120MB (单位 byte) | 超过该大小切新的 file group |
hoodie.logfile.to.parquet.compression.ratio | log文件大小转 parquet 的比率 | 0.35 | hoodie 统一依据 parquet 大小来评估小文件策略 |
hoodie.parquet.small.file.limit | 在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件 | 104857600 默认 100MB (单位 byte) | 大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大 |
hoodie.copyonwrite.record.size.estimate | 预估的 record 大小,hoodie 会依据历史的 commits 动态估算 record 的大小,但是前提是之前有单次写入超过 hoodie.parquet.small.file.limit 大小,在未达到这个大小时会使用这个参数 | 1024 默认 1KB (单位 byte) | 如果作业流量比较小,可以设置下这个参数 |
hoodie.logfile.max.size | LogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。 | 1073741824 默认1GB (单位 byte) |
4.2. 案例演示
CREATE TABLE t4(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t4',
'compaction.tasks' = '1',
'hoodie.parquet.max.file.size'= '10000',
'hoodie.parquet.small.file.limit'='5000',
'table.type' = 'MERGE_ON_READ'
);set table.dynamic-table-options.enabled=true;
insert into t4
select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;
5. Hadoop 参数
从 0.12.0 开始支持,如果有跨集群提交执行的需求,可以通过 sql 的 ddl 指定 per-job 级别的 hadoop 配置。
名称 | 说明 | 默认值 | 备注 |
hadoop.${you option key} | 通过 hadoop.前缀指定 hadoop 配置项 | -- | 支持同时指定多个 hadoop 配置项 |
注:其他Hudi相关文章链接由此进 -> Hudi文章汇总