文章目录
- 一. 相关配置
- 1.1 拷贝编译好的jar包到Flink的lib目录
- 1.2 拷贝guava包,解决依赖冲突
- 1.3 配置环境变量
- 二. YARN Session模式下启动Flink SQL
- 2.1 解决依赖问题
- 2.2 启动yarn-session
- 2.3 在yarn session模式下启动flink sql
- 三. 测试
- 四. 异常问题汇总
一. 相关配置
1.1 拷贝编译好的jar包到Flink的lib目录
# 同步到其它节点
cd /home/hudi-0.12.0/packaging/hudi-flink-bundle/target/
cp ./hudi-flink1.14-bundle-0.12.0.jar /home/flink-1.14.5/lib
1.2 拷贝guava包,解决依赖冲突
# 同步到其它节点
[root@hp5 lib]# cd /home/hadoop-3.3.2/share/hadoop/common/lib
[root@hp5 lib]# ls | grep guava
guava-27.0-jre.jar
hadoop-shaded-guava-1.1.1.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
[root@hp5 lib]#
[root@hp5 lib]# cp guava-27.0-jre.jar /home/flink-1.14.5/lib/
1.3 配置环境变量
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
二. YARN Session模式下启动Flink SQL
2.1 解决依赖问题
# 同步到其它节点
cp /home/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar /home/flink-1.14.5/lib/
cd /home/flink-1.14.5/lib/
2.2 启动yarn-session
/home/flink-1.14.5/bin/yarn-session.sh -d
2.3 在yarn session模式下启动flink sql
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
三. 测试
set sql-client.execution.result-mode=tableau;
create table t1(
uuid varchar(20) primary key not enforced,
name varchar(10),
ts timestamp(3),
`pt` varchar(20)
)
PARTITIONED BY(`pt`)
WITH (
'connector' = 'hudi',
'path' = '/user/hudi_data/t1',
'table.type'= 'MERGE_ON_READ'
);
insert into t1 values
('id1', 'Test1', timestamp '1970-01-01 00:00:01', 'par1');
四. 异常问题汇总
测试Hudi集成Flink的时候,遇到各种各样的问题,后面把Flink的配置文件中关于高可用的部分注释掉就没问题了。
但是Flink的 master在哪个节点,yarn-session也必须运行在哪个节点。
修改Flink配置文件:
注释掉Flink高可用的配置,并增加web运行时监视端口号
jobmanager.rpc.address: hp5
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 1
#high-availability: zookeeper
#high-availability.cluster-id: /flink-cluster
#high-availability.storageDir: hdfs://hp5:8020/vmcluster/flink/ha/
#high-availability.zookeeper.quorum: hp5:2181,hp6:2181,hp7:2181
#state.backend: filesystem
#state.checkpoints.dir: hdfs://hp5:8020/vmcluster/flink-checkpoints
#state.savepoints.dir: hdfs://hp5:8020/vmcluster/flink-savepoints
#jobmanager.execution.failover-strategy: region
#jobmanager.archive.fs.dir: hdfs://hp5:8020/vmcluster/completed-jobs/
#historyserver.archive.fs.dir: hdfs://hp5:8020/vmcluster/completed-jobs/
classloader.check-leaked-classloader: false
# Web 的运行时监视器端口
rest.port: 9081