Iceberg从入门到精通系列之十七:Apache InLong往Iceberg同步数据

news2024/12/27 1:09:06

Iceberg从入门到精通系列之十七:Apache InLong往Iceberg同步数据

  • 一、概览
  • 二、版本支持
  • 三、依赖项
  • 四、SQL API 用法
  • 五、多表写入
  • 六、动态表名映射
  • 七、动态建库、建表
  • 八、动态schema变更
  • 九、Iceberg Load 节点参数
  • 十、数据类型映射

一、概览

  • Apache Iceberg是一种用于大型分析表的高性能格式。

二、版本支持

提取节点版本
IcebergIceberg:0.12.x,0.13.x

三、依赖项

<dependency>
    <groupId>org.apache.inlong</groupId>
    <artifactId>sort-connector-iceberg</artifactId>
    <version>1.7.0</version>
</dependency>

四、SQL API 用法

在 flink 中创建Iceberg表,我们推荐使用Flink SQL Client,因为它更便于用户理解概念。

Step.1 在hadoop环境下启动一个独立的flink集群。

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

Step.2 启动flink SQL客户端。

flink-runtime在 iceberg 项目中创建了一个单独的模块来生成一个捆绑的 jar,可以直接由 flink SQL 客户端加载。

如果想要flink-runtime手动构建捆绑的 jar,只需构建inlong项目,它将在<inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target。

默认情况下,iceberg 包含用于 hadoop 目录的 hadoop jars。如果我们要使用 hive 目录,我们需要在打开 flink sql 客户端时加载 hive jars。幸运的是,apache inlong将 一个捆绑的hive jar打包进入Iceberg。所以我们可以如下打开sql客户端:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell

Step.3 在当前 Flink 目录中创建表

默认情况下,我们不需要创建目录,只需使用内存目录即可。在目录中如果catalog-database.catalog-table不存在,会自动创建。这里我们只是加载数据。

在 Hive 目录中管理的表

下面的 SQL 会在当前 Flink 目录中创建一个 Flink 表,映射到 iceberg 目录中default_database.iceberg_table管理的 iceberg 表。由于目录类型默认是 hive,所以这里不需要放catalog-type.

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

如果要创建 Flink 表映射到 Hive 目录中管理的不同Iceberg表(例如hive_db.hive_iceberg_table在 Hive 中),则可以创建 Flink 表,如下所示:

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

将记录写入 Flink 表时,如果底层目录数据库(hive_db上例中)不存在,则会自动创建它。

在 hadoop 目录中管理的表

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到default_database.flink_tablehadoop 目录中管理Iceberg表。

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_prod',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

Step.6 向Iceberg表中插入数据

INSERT INTO `flink_table` 
    SELECT 
    `id` AS `id`,
    `d` AS `name`
    FROM `source_table`

在自定义Catalog中管理的表

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到default_database.flink_table自定义目录中管理的Iceberg表。

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-type'='custom',
    'catalog-impl'='com.my.custom.CatalogImpl',
     -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value',
     ...
);

五、多表写入

目前 Iceberg 支持多表同时写入,需要在 FLINK SQL 的建表参数上添加 ‘sink.multiple.enable’ = ‘true’ 并且目标表的schema 只能定义成 BYTES 或者 STRING ,以下是一个建表语句举例:

CREATE TABLE `table_2`(
    `data` STRING)
WITH (
    'connector'='iceberg-inlong',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://localhost:8020/hive/warehouse',
    'sink.multiple.enable' = 'true',
    'sink.multiple.format' = 'canal-json',
    'sink.multiple.add-column.policy' = 'TRY_IT_BEST',
    'sink.multiple.database-pattern' = '${database}',
    'sink.multiple.table-pattern' = 'test_${table}'
);

要支持多表写入同时需要设置上游数据的序列化格式(通过选项 ‘sink.multiple.format’ 来设置, 目前仅支持 [canal-json|debezium-json])。

六、动态表名映射

Iceberg 在多表写入的时可以自定义映射的数据库名和表名的规则,可以填充占位符然后添加前后缀来修改映射的目标表名称。 Iceberg Load Node 会解析 ‘sink.multiple.database-pattern’ 作为目的端的 数据库名, 解析 ‘sink.multiple.table-pattern’ 作为目的端的表名,占位符是从数据中解析出来的,变量是严格通过 ‘${VARIABLE_NAME}’ 来表示, 变量的取值来自于数据本身, 即可以是通过 ‘sink.multiple.format’ 指定的某种 Format 的元数据字段, 也可以是数据中的物理字段。 关于 ‘topic-parttern’ 的例子如下:

  • ‘sink.multiple.format’ 为 ‘canal-json’:
  • ‘topic-pattern’ 为 ‘{database}_${table}’, 提取后的 Topic 为 ‘inventory_products’ (‘database’, ‘table’ 为元数据字段, ‘id’ 为物理字段)
  • ‘topic-pattern’ 为 ‘{database} t a b l e {table} table{id}’, 提取后的 Topic 为 ‘inventory_products_111’ (‘database’, ‘table’ 为元数据字段, ‘id’ 为物理字段)

七、动态建库、建表

Iceberg在多表写入时遇到不存在的表和不存在的库时会自动创建数据库和数据表,并且支持在运行过程中新增捕获额外的表入库。 默认的Iceberg表参数为:‘format-version’ = ‘2’、‘write.upsert.enabled’ = ‘true’'、‘engine.hive.enabled’ = ‘true’

八、动态schema变更

Iceberg在多表写入时支持同步源表结构变更到目标表(DDL同步),支持的schema变更如下:

在这里插入图片描述

九、Iceberg Load 节点参数

选项是否必须默认值类型描述
connector必需(none)String指定要使用的连接器,应该是’iceberg’
catalog-type必需hiveStringhive或hadoop用于内置目录,或为使用catalog-impl的自定义目录实现未设置
catalog-name必需(none)String目录名称
catalog-database必需(none)String目录名称
catalog-table必需(none)String在Iceberg目录中管理的数据库名称
catalog-impl自定义custom可选(none)String如果未设置,则必须设置完全限定的类名自定义目录实现catalog-type
cache-enabled可选trueBoolean是否启用目录缓存,默认值为true
urihive catalog可选(none)StringHive元存储的thrift URI
clientshive catalog可选2IntegerHive Metastore客户端池大小,默认值为2
warehousehive catalog或hadoop catalog可选(none)String对于Hive目录,是Hive仓库位置,如果既不设置hive-conf-dir指定包含hive-site.xml配置文件的位置也不添加正确hive-site.xml的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件
hive-conf-dirhive catalog可选(none)Stringhive-site.xml包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,hive.metastore.warehouse.dirfrom <hive-conf-dir>/hive-site.xml(或来自类路径的 hive 配置文件)的值将被该值覆盖。warehousehive-conf-dirwarehouse
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}。
sink.multiple.enable可选falseBoolean是否开启多路写入
sink.multiple.schema-update.policy可选TRY_IT_BESTEnum遇到数据中schema和目标表不一致时的处理策略TRY_IT_BEST:尽力而为,尽可能处理,处理不了的则忽略IGNORE_WITH_LOG:忽略并且记录日志,后续该表数据不再处理THROW_WITH_STOP:抛异常并且停止任务,直到用户手动处理schema不一致的情况
sink.multiple.pk-auto-generated可选falseBoolean是否自动生成主键,对于多路写入自动建表时当源表无主键时是否将所有字段当作主键
sink.multiple.typemap-compatible-with-spark可选falseBoolean是否适配spark的类型系统,对于多路写入自动建表时是否需要适配spark的类型系统

十、数据类型映射

Iceberg数据类型详细信息。这里介绍了加载数据如何将 Iceberg 类型转换为 Flink 类型。

Flink SQL类型Iceberg类型
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/761044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习算法】主成分分析(PCA)

主成分分析(PCA) PCA(Principal Component Analysis) 是实现数据降维的一种算法。正如其名&#xff0c;假设有一份数据集&#xff0c;每条数据的维度是d&#xff0c;PCA通过分析这d个维度的前k个主要特征(这k个维度在原有d维特征的基础上重新构造出来&#xff0c;且是全新的正交…

SpringBoot+React学科竞赛管理系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBootReact框架开发的学科竞赛管理系统。首先&#xff0c;这是一个前后端分离的项目&#xff0c;代码简洁…

初学者怎么学习c++(合集)

学习c方法1 找一本好的书本教材&#xff0c;辅助看教学视频。好的教材&#xff0c;可以让你更快更好的进入C/C的世界。在校学生的话&#xff0c;你们的教材通常都是不错的。如果是自学&#xff0c;推荐使用谭浩强出的C/C经典入门教材。看视频是学习比较直观的方式。建议先看课本…

从零开始理解Linux中断架构(20)--关于二级中断控制-链式chained Handler

二级中断控制器是个双重角色,在上级中断控制器看来他是个中断设备,在连接到他的下级设备来看,他是个中断控制器。所以处理完成基本的中断控制器管理功能:映射本地中断,还要多个动作:修改上级中断的默认irq Handler,向上级中断设置自己的链式中断处理函数。 中断控制的层…

Springboot实现过滤器

一、导言 在Spring Boot中&#xff0c;过滤器是一种用于对HTTP请求进行预处理和后处理的组件。相较于拦截器&#xff0c;过滤器属于Servlet规范的一部分&#xff0c;它能够在请求进入Web容器之前或返回给客户端之前进行操作。 要在Spring Boot中实现过滤器&#xff0c;可以按…

指针进阶(万字深层次指针解析)

❤️ 作者简介 &#xff1a;对纯音乐情有独钟的阿甘 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识&#xff0c;对纯音乐有独特的喜爱 &#x1f4d7; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;如果你也感兴趣的话欢迎关注博主&#xff0c;期待更新 指针进阶 …

Java正则表达式,不定期更新

Java正则表达式 1. 匹配数字&#xff08;包含负数、小数&#xff09;2. 匹配不是纯数字和纯字母且需要8位以上的密码3. 密码&#xff1a;字母、数字、符号&#xff08;_-*.,!#符号可自定义&#xff09;三选二4. 密码&#xff1a;必须包含大写、小写、数字、符号&#xff08;_-*…

车道线检测|利用边缘检测的原理对车道线图片进行识别

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 这两个都是博主在学习Linux操作系统过程中的记录&#xff0c;希望对大家的学习有帮助&#xff01; 操作系统Operating Syshttps://blog.csdn.net/yu_cblog/category_12165502.html?spm1001.2014.3001.5482Linux S…

工程监测振弦采集仪的解决方案案例解释

振弦采集仪是一种用于测量结构物的振动状态和应力变化的高精度仪器&#xff0c;广泛应用于建筑、桥梁、隧道、地铁等工程领域。以下是一些常见的解决方案案例分析&#xff1a; 基础监测方案&#xff1a;对于大型建筑或桥梁工程&#xff0c;需要对基础进行实时监测。使用振弦采集…

System类 BigInterger BigDecimal

System类 常用方法和案例 exit&#xff1a; 退出当前程序 System.out.println("zhang"); // 0表示一个正常退出的状态 System.exit(0); System.out.println("cheng");System.arraycopy&#xff1a; 复制数组元素&#xff0c;比较适合底层的调用&#xf…

基于linux下的高并发服务器开发(第二章)- 2.2 进程状态转换

01 / 进程的状态 &#xff08;1&#xff09;三态模型 进程状态分为三个基本状态&#xff0c;即就绪态&#xff0c;运行态&#xff0c;阻塞态 &#xff08;2&#xff09;五态模型 在五态模型中&#xff0c;进程分为新建态&#xff0c;就绪态&#xff0c;运行态&#xff0c;阻…

mongodb练习---增删改查

环境&#xff1a; 1. 创建一个数据库 名字grade 2. 数据库中创建一个集合名字 class 3. 集合中插入若干数据 文档格式如下 &#xff5b;name:zhang,age&#xff1b;10,sex:m,hobby:[a,b,c]&#xff5d; hobby: draw sing dance basketball football pingpong compu…

Java8实战-总结2

Java8实战-总结2 基础知识方法和Lambda传递代码&#xff1a;一个例子从传递方法到Lambda 基础知识 方法和Lambda Scala和Groovy等语言的实践已经证明&#xff0c;让方法等概念作为一等值可以扩充程序员的工具库&#xff0c;从而让编程变得更容易。一旦程序员熟悉了这个强大的…

理解LLM中的ReAct

large language models (LLMs)大语言模型在语义理解和交互式决策方面有着不错的表现。ReAct在一次交互中循环使用推理和行动两个操作解决复杂问题&#xff0c;推理即利用模型自身语义理解能力&#xff0c;行动则利用模型以外的能力&#xff08;如计算、搜索最新消息&#xff0c…

OpenCv之滤波器

目录 一、卷积 二、方盒滤波与均值滤波 三、高斯滤波 四、中值滤波 五、双边滤波 一、卷积 图像卷积就是卷积核在图像上按行华东遍历像素时不断的相乘求和的过程 相关知识点: 步长:就是卷积核在图像上移动的步幅.(为充分扫描图片&#xff0c;步长一般为1)padding:指在图片…

跨服务器跨库数据联合查询

今天群里有人问多个数据源, 可否显示在一个dbgrid, 我感觉是可以的 应该有两种办法 1,如果你两个服务器上都是用的mssqlserver, 那比较好办的, 如果不同数据库,如一个mssql,一个oracle。 则需要ssms方式创建。 通过SSMS查看,如果Microsoft OLE DB Provider for …

docekr-compose搭建redis集群(三主三从)

硬件&#xff1a;三台主机 172.50.2.40 172.50.2.41 172.50.2.42 需求&#xff1a;不想让它随机分配主从关系。想指定主从关系&#xff0c;如下&#xff1a; 主节点&#xff1a;172.50.2.40:6379&#xff0c;从节点172.50.2.41:6378 主节点&#xff1a;172.50.2.41:6379&…

C波段可调谐激光器控制软件系统

花了两周时间&#xff0c;利用下班时间&#xff0c;设计了一个ITLA可调谐激光器控制系统&#xff0c;从硬件到软件。下面这个图片整套硬件系统&#xff0c;软件硬件都自己设计&#xff0c;可以定制&#xff0c;做到单片机问题也不大。相当于一套光源了 这是软件使用的界面&…

PyTorch中的torch.nn.Linear函数解析

torch.nn是包含了构筑神经网络结构基本元素的包&#xff0c;在这个包中&#xff0c;可以找到任意的神经网络层。这些神经网络层都是nn.Module这个大类的子类。torch.nn.Linear就是神经网络中的线性层&#xff0c;可以实现形如yXweight^Tb的加和功能。 nn.Linear()&#xff1a;…

Linux网络---网络预备

文章目录 计算机网络背景计算机网络协议网络传输基本流程 网络中的地址管理 一、计算机网络背景 独立模式: 计算机之间相互独立; 网络互联: 多台计算机连接在一起, 完成数据共享; 局域网LAN: 计算机数量更多了, 通过交换机和路由器连接在一起 广域网WAN: 将远隔千里的计算机…