文章目录
- 一. 通过Flink SQL将MySQL数据写入Hudi
- 二. 模拟Flink任务异常
- 2.1 手工停止job
- 2.2 指定checkpoint来恢复数据
- 2.3 整个yarn-session上的任务恢复
- 三. 模拟源端异常
- 3.1 手工关闭源端 MySQL 服务
- 3.2 FLink任务查看
- FAQ:
- 1. checkpoint未写入数据
- 2. checkpoint 失败
- 3. 手工取消Flink job后,checkpoint文件自动删除
- 参考:
一. 通过Flink SQL将MySQL数据写入Hudi
启动Yarn Session
$FLINK_HOME/bin/yarn-session.sh -jm 16384 -tm 16384 -d 2>&1 &
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
Flink SQL 代码:
-- 设置checkpoint的时间间隔
set execution.checkpointing.interval=60sec;
-- 设置任务结束后不清空checkpoint文件,便于后续恢复
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
-- 同时只能有一个checkpoint进程
set execution.checkpointing.max-concurrent-checkpoints=1;
CREATE TABLE flink_mysql_cdc1 (
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hp8',
'port' = '3306',
'username' = 'root',
'password' = 'abc123',
'database-name' = 'test',
'table-name' = 'mysql_cdc',
'server-id' = '5409-5415',
'scan.incremental.snapshot.enabled'='true'
);
CREATE TABLE flink_hudi_mysql_cdc1(
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc1',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.precombine.field' = 'name',
'compaction.async.enabled' = 'true',
'hive_sync.enable' = 'true',
'hive_sync.table' = 'flink_hudi_mysql_cdc1',
'hive_sync.db' = 'test',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hp5:9083',
'hive_sync.conf.dir'='/home/apache-hive-3.1.2-bin/conf'
);
set table.exec.resource.default-parallelism=4;
insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
Flink web界面:
2000w数据初始化已经完成
checkpoint日志量真的多
hdfs查看checkpoint日志量
二. 模拟Flink任务异常
2.1 手工停止job
在Flink web界面将Flink SQL任务手工结束掉
2.2 指定checkpoint来恢复数据
查找最近的checkpoint:
代码:
set 'execution.savepoint.path'='hdfs://hp5:8020/vmcluster/flink-checkpoints/a2874606453b4aebfdaca2f627355f99/chk-23';
insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
2.3 整个yarn-session上的任务恢复
待测试:
如果是整个yarn-session异常,也可以启动yarnsession的时候指定checkpoint。
$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d -s hdfs://hp5:8020/vmcluster/flink-checkpoints/c12eb538c2e8965d2d94c170b67641f2/chk-1/_metadata
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
三. 模拟源端异常
3.1 手工关闭源端 MySQL 服务
service mysqld stop
3.2 FLink任务查看
Flink可以自己重试,这个还是比较不错,无需人工干预。
等mysql启动成功之后,任务又可以继续衔接上。
FAQ:
1. checkpoint未写入数据
别人的checkpoint
我的checkpoint
看来是我的checkpoint都没成功
修改checkpoint时间间隔:
-- 修改前:
set execution.checkpointing.interval=10sec;
-- 修改后:
set execution.checkpointing.interval=60sec;
2. checkpoint 失败
报错信息:
Checkpoint Coordinator is suspending.
解决方案:
把yarn-session的资源由8G提升到16G问题解决。
对于一些大表,最好还是先通过Spark进行初始化,然后在接增量。
3. 手工取消Flink job后,checkpoint文件自动删除
https://developer.aliyun.com/ask/435979
http://events.jianshu.io/p/032396543ceb
在网上看到的资源都是针对代码级别的,没有看到Flink SQL级别的
还是得上官网查找
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
参考:
- https://blog.csdn.net/qq_31866793/article/details/103069646