FlinkSQL写入iceberg—Windows环境下

news2024/11/24 5:02:25

前置条件

Flink运行版本13.1,iceberg依赖版本:1.0.0

依赖

FlinkSQL运行环境略。

注意版本匹配,采用不合适版本可能导致无法读写

        <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.13</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

hadoop资源

1、hadoop 客户端下载(2.7.7) , 国内镜像下载网站(低版本安装包没有)。

2、 在winutils中下载相对应版本的winutils.exe文件放置于hadoop的bin目录下;且将其中的hadoop.dll放置于c:/windows/system32 下。

3、将环境变量HADOOP_HOME赋值, 且在HADOOP_HOME/bin赋值在PATH(系统)变量中。

tEnv.executeSql("insert into hadoop_catalog.inf_db.inf_customer select * from inf_customer_source ");

导致的问题:

java.io.FileNotFoundException: File file:/D:/test/icetest/iceberg/hadoop/warehouse/integration_catalog/inf_db/inf_customer_source/metadata/version-hint.text does not exist

Caused by:org.apache.flinktaleatalog,exceptions.TablealredyExistExcption Create breakpont : able or view) inf.d.inf.customer.source already exists in ataloghadoop_catalog.

官网相关

相关描述地址
在这里插入图片描述

测试代码

public class WriteIcebergSql_2 {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 9999);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.disableOperatorChaining();
        env.enableCheckpointing(30000);
        env.setParallelism(1);
        //System.setProperty("HADOOP_USER_NAME", "hadoop");//不设置这个,会提醒无读写权限
        //System.setProperty("HADOOP_USER_NAME", "hadoop");

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        //实时读取配置
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //支持SQL 的 OPTIONS选项
        configuration.setBoolean("table.dynamic-table-options.enabled", true);

        //1、读取kafka源表
        String source_table = "CREATE TABLE source_table( \n" +
                " id STRING,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " addr STRING,\n" +
                " eff_date TIMESTAMP(6),\n" +
                " phone STRING\n" +
                ")WITH( \n" +
                "'connector' = 'kafka', \n" +
                "'topic' = 'k2i_topic_1', \n" +
                "'properties.bootstrap.servers' = '127.0.0.1:9092', \n" +
                "'properties.group.id' = 'testGroup', \n" +
                "'scan.startup.mode' = 'latest-offset', " +
                "'format' = 'debezium-json' \n" +
                ")\n";
        tableEnv.executeSql(source_table);

        //2、创建Catalog
        tableEnv.executeSql("CREATE CATALOG hadoop_catalog\n" +
                "  WITH (\n" +
                "   'type' = 'iceberg',\n" +
                "   'warehouse' = 'file:///D:/data/iceberg',\n" +
                "   'catalog-type' = 'hadoop'\n" +
                "  )");

        //3、使用当前Catalog
        tableEnv.useCatalog("hadoop_catalog");
        //4、创建数据库
        tableEnv.executeSql("CREATE DATABASE if not exists inf_db");
        //5、使用当前数据库
        tableEnv.useDatabase("inf_db");

        //6、创建iceberg表
        String sinkDDL = "CREATE TABLE if not exists hadoop_catalog.inf_db.inf_customer " +
                "( \n" +
                " id STRING,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " addr STRING,\n" +
                " eff_date TIMESTAMP(6),\n" +
                " phone STRING,\n" +
                "primary key (id) not enforced \n" +
                ") \n" +
                "  WITH ( \n" +
                " 'format-version'='2', \n" +
                " 'write.upsert.enabled'='true' \n" +
                " )";

        tableEnv.executeSql(sinkDDL);
        //读取Kafka源表,将CUD,JSOn格式数据写入Iceberg
        //tableEnv.executeSql("insert into hadoop_catalog.inf_db.inf_customer select * from default_catalog.default_database.source_table ");
        tableEnv.executeSql("SELECT * FROM hadoop_catalog.inf_db.inf_customer /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/").print(); ;
    }
}

测试数据

{“op”:“c” , “op_time”:“2023-06-01 13:56:49”, “after”:{“id”:“13” , “name”:“张三”},“TABLE_KEY”:“id”,“table”:“inf_test”}
{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”: {“id”:“13” , “name”:“张三”},“after”:{“id”:“13” , “age”:“26” , “eff_date”:“2023-05-31 16:16:49”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}
{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”:{“id”:“13” , “name”:“张三”,“age”:“26” , “eff_date”:“2023-05-31 16:16:49”} , “after”:{“id”:“13” , “addr”:“CHINA-GD-GUANGZHOU” , “EFF_DATE”:“2023-06-13 16:16:49”,“phone”:“152222222”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}

测试过程

在这里插入图片描述

{“op”:“c” , “op_time”:“2023-06-01 13:56:49”, “after”:{“id”:“13” , “name”:“张三”},“TABLE_KEY”:“id”,“table”:“inf_test”}

在这里插入图片描述

{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”: {“id”:“13” , “name”:“张三”},“after”:{“id”:“13” , “age”:“26” , “eff_date”:“2023-05-31 16:16:49”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}

在这里插入图片描述

{“op”:“u” , “op_time”:“2023-06-01 13:56:49”, “before”:{“id”:“13” , “name”:“张三”,“age”:“26” , “eff_date”:“2023-05-31 16:16:49”} , “after”:{“id”:“13” , “addr”:“CHINA-GD-GUANGZHOU” , “EFF_DATE”:“2023-06-13 16:16:49”,“phone”:“152222222”},“TABLE_KEY”:“USER_ID”,“table”:“inf_test”}

在这里插入图片描述

小结

如果从hadoop控制端看元数据,其实质是先存储被修改数据本质上是被标记删除,再新增修改数据。 本质上不支持局部更新。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/648671.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

shell脚本变量-特殊变量

目录 特殊变量&#xff1a;$n案例需求 特殊变量&#xff1a;$#案例需求 特殊变量&#xff1a;$*、$案例需求 特殊变量&#xff1a;$&#xff1f;特殊变量&#xff1a;$$ 特殊变量&#xff1a;$n 语法 $n含义&#xff1a; 用于接收脚本文件执行时传入的参数 $0 用于获取当前脚…

机器人系统中的六大漏洞

原创 | 文 BFT机器人 在过去的几十年里&#xff0c;创新和技术导致机器人技术不断发展。 机器人系统正在迅速变得更加多产、复杂、有能力、智能化和网络化&#xff0c;并被用于越来越多的任务。 最初&#xff0c;机器人技术领域仅限于制造领域&#xff0c;但现在机器人可以与人…

KMP算法 - 确定有限状态自动机

KMP神在哪里&#xff1f; 子串匹配问题&#xff0c;拍脑袋一下子想出来的暴力解法大抵都是两重for循环&#xff0c;不断重复扫描主串&#xff0c;与子窜进行匹配&#xff0c;重复换句话讲就是冗余&#xff0c;会有很高的时间复杂度 我先前博客大作业发的模糊查找算法就是如此&…

三分钟告诉你如何和智能ai聊天

有一个名叫艾丽的年轻女孩&#xff0c;她生活在一个科技发达的未来世界。在这个世界里&#xff0c;人们与人工智能伙伴共同生活。艾丽对ai技术充满好奇&#xff0c;尤其是对ai对话聊天工具的运作方式。为了知道ai对话聊天工具怎么用&#xff0c;艾丽决定展开探索。 方案一&…

智能无线监测器的工作原理及应用优势

在现代工业生产中&#xff0c;设备状态监测对于确保生产的安全性、效率和可靠性至关重要。随着科技的不断发展&#xff0c;智能无线监测器成为工业设备状态监测的利器。本文将介绍智能无线监测器在工业领域中的应用&#xff0c;以及其带来的优势和价值。 图.设备状态监测&#…

智驾风向标|卷、乱、难,如何穿越多极分化新周期?

竞争越来越卷&#xff0c;企业越来越难&#xff0c;市场处于混乱期。对于大多数供应商来讲&#xff0c;穿越新周期的战略一定是先有规模&#xff08;市场份额&#xff09;&#xff0c;然后才是利润。 在6月8日召开的2023&#xff08;第十四届&#xff09;高工智能汽车开发者大…

8个你必须知道的Java8新特性,让你的代码变得优雅!

Java 8 是一次重大的发行版更新&#xff0c;引入了大量新特性和改进&#xff0c;以下是 Java 8 的主要特性&#xff1a; 文章目录 Java 8 是一次重大的发行版更新&#xff0c;引入了大量新特性和改进&#xff0c;以下是 Java 8 的主要特性&#xff1a;1.Lambda 表达式2.Stream …

云平台 stm32连接阿里云2023最新版本保姆级别教学只看这一篇就够了~

注册账号 阿里云平台点击直达 点击控制台 鼠标悬浮会出现下拉栏 点击物联网 再点击物联网平台 点击公共实例 新用户需要开通 开通需要五分钟的时间 点击创建产品 蓝色显眼字体 参数设置 仔细比对下图 点击查看产品详情 蓝色显眼字体 点击功能定义 点击编辑草图 实际上就是定义…

如何通过Android平台的API实现5G网络的支持 安卓系统版本和5g网络相关【一】

前面分享了两篇5G基带相关的移植修改博文。 安卓高通机型的基带移植 修改 编译的相关 增加信号 支持5G等【一】 安卓高通机型的基带移植 修改 编译的相关 增加信号 支持5G等【二】 今天的帖子聊聊安卓版本与5G网络与机型和修改之间相关的话题。众所周知&#xff0c;目前的机型…

如何获取签章定位信息

在合同系统中&#xff0c;经常需要在合同文档的特定位置放置签名/印章图片。在合同拟稿过程中&#xff0c;放置签名/印章图片只是为了获取一个精确的定位信息&#xff0c;在合同定稿阶段才根据拟稿阶段得到的位置信息&#xff0c;去插入真正的签名/印章。那么如何在合同系统中高…

基于OpenMV的疲劳驾驶检测系统的设计

一、前言 借助平台将毕业设计记录下来&#xff0c;方便以后查看以及与各位大佬朋友们交流学习。如有问题可以私信哦。 本文主要从两个方面介绍毕业设计&#xff1a;硬件&#xff0c;软件&#xff08;算法&#xff09;。以及对最后的实验结果进行分析。感兴趣的朋友们可以评论区…

创新案例|专注在线 协作平台 设计产品中国首家PLG独角兽企业蓝湖如何实现98%的头部企业渗透率

蓝湖起步于2015年&#xff0c;是一款服务于产品经理、设计师、工程师的产品设计研发在线协作工具&#xff0c; 2021年10月&#xff0c;蓝湖宣布完成C轮融资&#xff0c;融资额高达10亿人民币&#xff0c;称为中国2B市场中首家采用PLG发展的独角兽企业&#xff0c;并实现了从100…

conda虚拟环境列表错误module ‘attr‘ has no attribute ‘s‘的解决方法

列出虚拟环境列表命令&#xff1a;conda info -e 或者conda env listconda info -e 这个可以正常显示&#xff0c;conda env list却报错了&#xff0c;以前是没有问题的&#xff0c;因为这个命令我更习惯使用&#xff0c;所以这个小问题必须解决掉&#xff0c;或许其他读者可能…

undetected_chromedriver解决网页被检测

一、问题分析 selenium打开浏览器模仿人工操作是诸多爬虫工作者最万能的网页数据获取方式&#xff0c;但是在做自动化爬虫时&#xff0c;经常被检测到是selenium驱动。比如前段时间selenium打开维普高级搜索时得到的页面是空白页&#xff0c;懂车帝对selenium反爬也很厉害。 二…

【React】setState原理,SCU,不可变对象,Ref,受控组件,高阶组件,封装轮播图组件

❤️ Author&#xff1a; 老九 ☕️ 个人博客&#xff1a;老九的CSDN博客 &#x1f64f; 个人名言&#xff1a;不可控之事 乐观面对 &#x1f60d; 系列专栏&#xff1a; 文章目录 setState原理setState异步更新 SCU不可变对象RefRef获取DOMRef获取组件 非受控组件受控组件高阶…

word目录怎么自动生成,3个步骤轻松搞定!

案例&#xff1a;我在做策划案的时候&#xff0c;需要制作目录。我觉得自己手动制作目录很困难&#xff0c;通过word的可以自动生成目录&#xff0c;但是我不知道具体的操作方法。有没有小伙伴可以分享一下&#xff1f; 在制作任务书、书写论文的时候&#xff0c;经常需要添加…

vue+java+springboot企业办公人事oa办公管理系统2142g

本企业OA管理系统有管理员和用户。管理员功能有个人中心&#xff0c;用户管理&#xff0c;公告信息管理&#xff0c;客户关系管理&#xff0c;通讯录管理&#xff0c;日程安排管理&#xff0c;车辆信息管理&#xff0c;文件信息管理&#xff0c;工作日志管理&#xff0c;上班考…

0基础学习VR全景平台篇第42篇:编辑器底部菜单-分组管理

大家好&#xff0c;欢迎观看蛙色VR官方系列——后台使用课程&#xff01; 本期为大家带来蛙色VR平台&#xff0c;底部菜单—分组管理功能操作。 功能位置示意 一、本功能将用在哪里&#xff1f; 分组管理&#xff0c;指观看者可点击不同分组&#xff0c;查看不同类型全景内容…

learn C++ NO.9——string(2)

引言&#xff1a; 现在是北京时间的2023年6月15日早上的10点14分。时间过得飞快&#xff0c;现在已经大一的最后一个星期了。明天也是大一最后一次课&#xff0c;线下的实训课。线下实训内容为c语言二级的内容&#xff0c;对我来说跟学校的课效率太低下了&#xff0c;我还是比…

初识网络之再看udp协议

目录 一、端口号 1. 五元组 2. 端口号范围划分 3. 一些知名端口号 4. 进程与端口号 5. 两个常用网络工具 5.1 netstat 5.2 pidof 二、UDP协议 1. udp协议格式 2. udp报文解包 3. udp报文分用 4. udp的特点 5. 缓冲区 5.1 tcp缓冲区 5.2 udp缓冲区 6. 一些常见…