前置条件
Flink运行版本13.1,iceberg依赖版本:1.0.0
依赖
FlinkSQL运行环境略。
注意版本匹配,采用不合适版本可能导致无法读写
。
<!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.13</artifactId>
<version>${iceberg.version}</version>
</dependency>
hadoop资源
1、hadoop 客户端下载(2.7.7) , 国内镜像下载网站(低版本安装包没有)。
2、 在winutils中下载相对应版本的winutils.exe
文件放置于hadoop的bin目录下;且将其中的hadoop.dll放置于c:/windows/system32 下。
3、将环境变量HADOOP_HOME赋值, 且在HADOOP_HOME/bin赋值在PATH(系统)变量中。
tEnv.executeSql("insert into hadoop_catalog.inf_db.inf_customer select * from inf_customer_source ");
导致的问题:
java.io.FileNotFoundException: File file:/D:/test/icetest/iceberg/hadoop/warehouse/integration_catalog/inf_db/inf_customer_source/metadata/version-hint.text does not exist
Caused by:org.apache.flinktaleatalog,exceptions.TablealredyExistExcption Create breakpont : able or view) inf.d.inf.customer.source already exists in ataloghadoop_catalog.
官网相关
相关描述地址
测试代码
public class WriteIcebergSql_2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 9999);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.disableOperatorChaining();
env.enableCheckpointing(30000);
env.setParallelism(1);
//System.setProperty("HADOOP_USER_NAME", "hadoop");//不设置这个,会提醒无读写权限
//System.setProperty("HADOOP_USER_NAME", "hadoop");
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//实时读取配置
Configuration configuration = tableEnv.getConfig().getConfiguration();
//支持SQL 的 OPTIONS选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1、读取kafka源表
String source_table = "CREATE TABLE source_table( \n" +
" id STRING,\n" +
" name STRING,\n" +
" age INT,\n" +
" addr STRING,\n" +
" eff_date TIMESTAMP(6),\n" +
" phone STRING\n" +
")WITH( \n" +
"'connector' = 'kafka', \n" +
"'topic' = 'k2i_topic_1', \n" +
"'properties.bootstrap.servers' = '127.0.0.1:9092', \n" +
"'properties.group.id' = 'testGroup', \n" +
"'scan.startup.mode' = 'latest-offset', " +
"'format' = 'debezium-json' \n" +
")\n";
tableEnv.executeSql(source_table);
//2、创建Catalog
tableEnv.executeSql("CREATE CATALOG hadoop_catalog\n" +
" WITH (\n" +
" 'type' = 'iceberg',\n" +
" 'warehouse' = 'file:///D:/data/iceberg',\n" +
" 'catalog-type' = 'hadoop'\n" +
" )");
//3、使用当前Catalog
tableEnv.useCatalog("hadoop_catalog");
//4、创建数据库
tableEnv.executeSql("CREATE DATABASE if not exists inf_db");
//5、使用当前数据库
tableEnv.useDatabase("inf_db");
//6、创建iceberg表
String sinkDDL = "CREATE TABLE if not exists hadoop_catalog.inf_db.inf_customer " +
"( \n" +
" id STRING,\n" +
" name STRING,\n" +
" age INT,\n" +
" addr STRING,\n" +
" eff_date TIMESTAMP(6),\n" +
" phone STRING,\n" +
"primary key (id) not enforced \n" +
") \n" +
" WITH ( \n" +
" 'format-version'='2', \n" +
" 'write.upsert.enabled'='true' \n" +
" )";
tableEnv.executeSql(sinkDDL);
//读取Kafka源表,将CUD,JSOn格式数据写入Iceberg
//tableEnv.executeSql("insert into hadoop_catalog.inf_db.inf_customer select * from default_catalog.default_database.source_table ");
tableEnv.executeSql("SELECT * FROM hadoop_catalog.inf_db.inf_customer /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/").print(); ;
}
}
测试数据
{“op”:“c” , “op_time”:“2023-06-01 13:56:49”, “after”:{“id”:“13” , “name”:“张三”},“TABLE_KEY”:“id”,“table”:“inf_test”}
{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”: {“id”:“13” , “name”:“张三”},“after”:{“id”:“13” , “age”:“26” , “eff_date”:“2023-05-31 16:16:49”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}
{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”:{“id”:“13” , “name”:“张三”,“age”:“26” , “eff_date”:“2023-05-31 16:16:49”} , “after”:{“id”:“13” , “addr”:“CHINA-GD-GUANGZHOU” , “EFF_DATE”:“2023-06-13 16:16:49”,“phone”:“152222222”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}
测试过程
{“op”:“c” , “op_time”:“2023-06-01 13:56:49”, “after”:{“id”:“13” , “name”:“张三”},“TABLE_KEY”:“id”,“table”:“inf_test”}
{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”: {“id”:“13” , “name”:“张三”},“after”:{“id”:“13” , “age”:“26” , “eff_date”:“2023-05-31 16:16:49”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}
{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”:{“id”:“13” , “name”:“张三”,“age”:“26” , “eff_date”:“2023-05-31 16:16:49”} , “after”:{“id”:“13” , “addr”:“CHINA-GD-GUANGZHOU” , “EFF_DATE”:“2023-06-13 16:16:49”,“phone”:“152222222”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}
小结
如果从hadoop控制端看元数据,其实质是先存储被修改数据本质上是被标记删除,再新增修改数据。 本质上不支持局部更新。