1、环境准备
将编译好的jar包放到Flink的lib目录下。
cp hudi-flink1.13-bundle-0.12.0.jar /opt/module/flink-1.13.2/lib
2、sql-client方式
2.1、修改flink-conf.yaml配置
vim /opt/module/flink-1.13.2/conf/flink-conf.yaml
state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:9000/ckps
state.backend.incremental: true
2.2、yarn-session模式启动
1、启动
1、先启动hadoop集群,然后通过yarn-session启动flink:
/opt/module/flink-1.13.2/bin/yarn-session.sh -d
2、再启动sql-client
/opt/module/flink-1.13.2/bin/sql-client.sh embedded -s yarn-session
2、写入数据
表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET 'sql-client.execution.result-mode' = 'table'; --默认
变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET 'sql-client.execution.result-mode' = 'changelog';
Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业执行模式的不同(execution.type):
SET 'sql-client.execution.result-mode' = 'tableau';
-- 创建hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t1', --hudi表的基本路径
'table.type' = 'MERGE_ON_READ' --默认是COW
);
-- 插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
3、IDEA编码方式
3.1、环境准备
1、手动install依赖
mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.11 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar
2、编写代码
import org.apache.flink.contrib.streaming.state.{EmbeddedRocksDBStateBackend, PredefinedOptions}
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import java.util.concurrent.TimeUnit
object HudiExample {
def main(args: Array[String]): Unit = {
// val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置状态后端RocksDB
val embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true)
// embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb")
embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
env.setStateBackend(embeddedRocksDBStateBackend)
// checkpoint配置
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10), CheckpointingMode.EXACTLY_ONCE)
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointStorage("hdfs://hadoop1:9000/ckps")
checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(10))
checkpointConfig.setTolerableCheckpointFailureNumber(5)
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1))
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val sTableEnv = StreamTableEnvironment.create(env)
sTableEnv.executeSql("CREATE TABLE sourceT (\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" + " ts timestamp(3),\n" +
" `partition` varchar(20)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
")")
sTableEnv.executeSql("create table t2(\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" `partition` varchar(20)\n" +
")\n" +
"with (\n" +
" 'connector' = 'hudi',\n" +
" 'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t2',\n" +
" 'table.type' = 'MERGE_ON_READ'\n" +
")")
sTableEnv.executeSql("insert into t2 select * from sourceT")
}
}
3、提交运行
bin/flink run -t yarn-per-job -c com.my.example.HudiExample ./myjars/HudiExample-1.0-SNAPSHOT-jar-with-dependencies.jar
4、核心参数设置
Flink可配参数:https://hudi.apache.org/docs/configurations#FLINK_SQL
4.1、去重参数
通过如下语法设置主键:
-- 设置单个主键
create table hoodie_table (
f0 int primary key not enforced,
f1 varchar(20),
...
) with (
'connector' = 'hudi',
...
)
-- 设置联合主键
create table hoodie_table (
f0 int,
f1 varchar(20),
...
primary key(f0, f1) not enforced
) with (
'connector' = 'hudi',
...
)
名称 | 说明 | 默认值 | 备注 |
hoodie.datasource.write.recordkey.field | 主键字段 | -- | 支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段 |
precombine.field (0.13.0 之前版本为 write.precombine.field) | 去重时间字段 | -- | record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record |
4.2、并发参数
名称 | 说明 | 默认值 | 备注 |
write.tasks | writer 的并发,每个 writer 顺序写 1~N 个 buckets | 4 | 增加并发对小文件个数没影响 |
write.bucket_assign.tasks | bucket assigner 的并发 | Flink的并行度 | 增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket) 数 |
write.index_bootstrap.tasks | Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数 | Flink的并行度 | 只在 index.bootstrap.enabled 为 true 时生效 |
read.tasks | 读算子的并发(batch 和 stream) | 4 | |
compaction.tasks | online compaction 算子的并发 | writer 的并发 | online compaction 比较耗费资源,建议走 offline compaction |
案例演示
可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */
CREATE TABLE sourceT (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create table t2(
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
)
with (
'connector' = 'hudi',
'path' = '/tmp/hudi_flink/t2',
'table.type' = 'MERGE_ON_READ'
);
insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */
select * from sourceT;
执行如下图所示: