SeaTunnel StarRocks 连接器的使用及原理介绍

news2024/10/5 21:21:15

作者:毕博,马蜂窝数据平台负责人,StarRocks 活跃贡献者 & Apache SeaTunnel 贡献者

Apache SeaTunnel(以下简称 SeaTunnel)是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。StarRocks 通过与 SeaTunnel 的结合可以轻松实现 StarRocks 和不同数据源之间的数据交换。

SeaTunnel StarRocks 连接器简介

首先介绍一下数据同步平台 Apache SeaTunnel 的基本架构。

上图为 SeaTunnel 架构图,它提供了一套抽象的 API,包括 Source 、Transform、Sink API 等。基于这些抽象 API 可扩展出各种各样 Connector,其中基于 Source API 实现的 Source Connector 可以从左侧众多的数据源中读取数据,Transform Connector 用于实现数据 Pineline 中的数据转换处理,而 Sink Connector 可以将数据写入到右侧多种异构的数据源中。

同时,在运行时,SeaTunnel 还提供了一个翻译层,会将 Connector 实现的 Source 和 Sink API 翻译成引擎内部可运行的 API,使连接器可以在不同引擎上运行。目前 SeaTunnel 支持三种执行引擎,Spark、Flink 以及 SeaTunnel 自研执行引擎 Zeta Engine。

以上就是 SeaTunnel 的整体架构,可以看出,SeaTunnel 通过 Souce Connector 和 Sink Connector 实现与不同数据源进行链接。

上图描述了 SeaTunnel 和 StarRocks 结合的整体情况,SeaTunnel 提供 了Source Connector 和 Sink Connector。在 Source 部分 StarRocks 的表作为数据源,通过 SeaTunnelSource Connector 分布式地去提取 StarRocks 的数据;中间通过 SeaTunnel 提供的 Transform Connector 做一些分布式的数据处理和转换;后面的 StarRocks Sink 连接器则主要是把 SeaTunnel 内存里的数据,通过 StarRocks 提供的 Stream Load API,将数据导入到 StarRocks。

StarRocks Connector 功能特性

01 Source 功能特性

上图展示了目前 SeaTunnel Source 连接器当前支持的核心功能和特性,具体包括:

字段投影: 假设待读取表有多个字段,但是整个数据处理的 Pipeline 中只用到部分字段,那么针对这些部分列做数据同步就是字段投影的使用;

谓词下推: 谓词下推可以在数据扫描的时候过滤大量用不到的数据,通过下推到引擎,可以减少数据传输的数据量;

数据类型的自动映射: 是关于从 StarRocks 读取的数据类型与 SeaTunnel 内部数据类型的映射,后面会介绍目前支持的数据类型;

用户自定义分片: 是通过将待读取数据源的整个数据集拆分多个分片,每一个分片可以单独查询,并且在分片生成的阶段用户可以通过配置参数去控制分片生成的数量;

并行读取:首先, StarRocks 是支持并行的读取数据源,同时基于上面的数据源分片的切分,在读取时多个分片同时独立进行,最终通过并行读取加快读取的速率;

状态的恢复:Source 连接器读取阶段切分多个分片之后,连接器在读取过程中会定期将未进行读取分片信息保存在 State 中;这样在故障恢复的时候,结合 State 中分片的位置信息进行重新读取;

至少一次:得益于状态恢复,所以在读取端提供至少一次的语义;

Batch 模式:目前 StarRocks 连接器 Source 部分只支持批模式。

02 Sink 功能特性

Sink 的数据导入是基于 StarRocks 的 Stream Load 实现,因为 Stream Load 支持 CSV 和 JSON 两种文件格式,所以连接器在 Sink 端可以指定 CSV 和 JSON 两种文件格式进行导入。

写入时,考虑到写入的效率,所以会涉及数攒批,进行批量数据写入,而不是单条提交。

如果写入出现了异常 ,程序会自动判断是不是可恢复的异常,再基于一定的策略进行重试。

关于 CDC,目前 SeaTunnel 支持数据库的 changelog 捕捉,再结合 StarRocks 的 Stream Load 接口,可以对 StarRocks 的主键模型表进行数据变更,包括插入、更新和删除数据,所以连接器当前支持将 SeaTunnel 获取的 CDC 数据导入到 StarRocks 中来。

上述列表展示了目前 Source 和 Sink 连接器已经支持的功能和特性,希望在实际应用中可以给大家提供一些参考。

StarRocks Connector 读取原理

接下来我会着重介绍 StarRocks Connector 的读取原理,帮助大家更好地使用连接器功能。

01 字段投影

我们在读 StarRocks 表的时候,是可以选择部分字段读取的,比如这里我们有一个 StarRocks 表,有 4 个字段。但是实际同步使用到的字段只有 lo_orderkey、lo_number 两个字段,对于指定部分列的提取数据场景,可以在配置 Source 连接器的时候,通过 fields 参数来指定要查询的字段和数据读到 SeaTunnel 上面的字段数据类型。

​这样,在 SeaTunnel 真正执行的时候,就能只同步指定的字段,最终同步到 StarRocks 的数据如下图。

​通过减少投影字段可以降低同步过程网络、内存资源消耗,提升同步性能。

02 谓词下推

在实际使用中,我们可能需要过滤掉部分行的数据,如获取表中 linenum< 3 的部分数据。这时,我们可以在配置 Source 连接器的时候,通过配置 scan-filter 参数来过滤指定的部分行。

在实际执行中会将条件下推到 StarRocks,在 StarRocks 引擎内进行分区剪裁或分桶剪裁等优化处理。

这样,在读取数据阶段跳过全表扫描,可以大大减少数据处理的数据量,提升读取数据的效率。

03 字段投影&谓词下推实现

在具体实现上,通过用户在连接器中配置中指定的 fields 和 scan-filter 参数,连接器在程序中会自动生成适用于 StarRocks 的查询语句。如图,通过程序转换,最终生成 SQL。

04 并行读取:实现方案

并行读取 StarRocks 数据主要有两种方案,以 Flink 引擎读取 StarRocks 为例。方案一:直接通过 JDBC 协议读取数据,数据最终需要通过 FE 单节点将数据抽取上来,读取效率较低。

​方案二:进行分布式的设计,先通过 FE 查询对应 StarRocks 表的分片的元数据信息, 获取待读取数据的数据分布情况,再用分布式并行的方式直接从多个 BE 节点读取数据。

​这样做让整体的吞吐能力得到很大的提升,目前 StarRocks Connector 基于第二种方案。

05 并行读取:获取 StarRocks 的数据分布

Source 连接器实现并行读取,首先要知道 StarRocks 表的存储的数据分布情况。当前 StarRocks 的 FE 提供了获取单表查询计划接口,通过指定要查询的表及 SQL 进行 API 接口的调用。

如上图所示,右侧是 FE 接口返回的结果经过序列化后对应的数据结构,query plan 为查询计划的字符串。partitions 是一个 map,key 是 StarRocks tablet ID ,value 为 tablet 实际分布在 BE 节点的地址,因为 StarRocks 表的数据是多副本管理,所以会有多个 BE 地址。

通过以上信息信息,就可以知道表中要查询数据的数据分布情况。

06并行读取:spilt 切分(基于数据分布)

要实现并行读取,就需要要对待查询的目标表的数据范围划分,再进行分片切分,让并行的线程读取特定分片的数据。

在 Souce Connector 实现中,分片切分是基于 StarRocks 表的数据分布进行数据范围划分。

如上图所示,左侧描述了 StarRocks 数据分布。StarRocks 使用列式存储,采用分区分桶机制进行数据管理。对应图中表 A 按照日期“月”划分分区,进一步的 2023-01 月份的分区切分为 5 个 分桶(A、B、C、D、E)。

分桶是 StarRocks 中最小的数据管理单元,每个分桶使用多副本进行组织,对应图中分别为分桶 A,有 A-1 、A-2、A-3 个副本;分桶 B 有 B-1、B-2、B-3 等,这些分桶副本最终会存储在不同的 BE 节点中。

假如我们要同步表 A 中 2023 年 1 月份的数据,首先要知道这部分数据的数据分布情况。之前介绍了通过 FE API 可以获取 StarRocks 表的数据分布情况,对应图中,分桶 A 数据保存在 BE-1、BE-2、 BE-3 上。

下一步,通过一定策略,为每一个 tablet 选择最优的 BE 查询节点,原则是最终结果中每一个BE 节点有相对均等数量的分桶等待被查询,这样可以保证在并行查询时,每一个 BE 节查询负载相对均衡。

最后,根据前面为每一个分桶选择查询 BE 节点信息生成的 split 分片。

07 并行读取:用户自定义分片

Source Connector 支持自定义分片,也就是用户可以控制分片生成, 通过 request_tablet-size 这个配置参数制。

刚刚我们介绍了生成 split 分片切分的过程,StarRocks 表 A 的 5 个分桶 A\B\C\D\E, 最终生成了 3 个分片对图中上半部分。假加我们想让查询数据的并发度更高,就需要生成更多的分片。这时,我们可以设置 request_tablet-size,限制每个分片中 tablet 的数量。比如我们配置 request_tablet-siz=1, 表示每个分片的分桶最多为 1, 那么最终将会生成 5 个分片,效果如上图所示。

08 并行读取:分配 spilt 到 reader

Split 切分好了,需要分配给每一个并行的 Reader。Reader 数量的指定是通过在任务的 env 配置并行度(下图左侧所示),配置好就会有几个并行的 Reader 去读取数据源。

如上图所示,右侧是具体分片分配给 Reader 的过程:Split 通过 split 中的属性 ID 向 Reader 数取模,使每一个 Reader 上分配的分片数相对一致。

09 并行读取:Reader 读取数据

将 split 分片配给 Reader 之后,每一个 Reader 就开始实际的数据读取,该过程是每个 Reader 通过 BE 提供的一组 thrift 协议向 BE 节点扫描。分桶对应的数据如图中所示,每个分片包含了需要向哪个 BE 节点查询及需要扫描 BE 上的哪些分桶数据。

​下图是 BE 提供 thrift 协议具体接口。

​有三个重要的方法,首先创建一个scanner ,通过类似游标的方式,多次调用 getnext 获取全部数据,最终数据都完成返回后,通过 close scaner 释放资源。

10 并行读取:arrow -> seatunnel row 的数据转换

Reader 通过 thrift 协议向 BE 节点扫描数据,最终从 BE 获取到的数据是 apache arrrow 的数据格式。

因为 StarRocks 表的数据通过 SeaTunnel 读取出来之后首先要转换为 SeaTunnel 自己的数据结构 SeaTunnelRow,之后才可以在 SeaTunnel 内部进行数据转换及写出,因此需要将 apache arrow 的数据类型转换为 SeaTunnel 的数据类型。

整个转换过程如下图所示:

其中 Apache Arrow 的 varchar 可以根据用户在 source 连接器配置数据投影的数据类型转换为 Date、Timestamp 和 String。

11 数据类型映射

最终从 StarRocks 读取的数据类型,从 BE 节点获取的 apache-arrrow 格式的数据类型以及转换到 SeaTunnel 上的数据类型三者之间的映射关系如下图,也是目前 StarRocks 连接器支持的数据类型映射,基本上覆盖了所有的数据类型,但 ARRAY、HLL、BITMAP 等暂时还不支持。

在使用中我们只需要关心 StarRocks 的 Datatype 和 SeaTunnel Datatype 的映射就可以,apache-arrrow 部分的转换是程序自动完成的。

12 并行读取:状态恢复

在读取的时候还会涉及到状态恢复,因为如果任务读取的数据量比较大,读取的时间会较长,中间一旦出现错误或者异常,需要从出错的位置重新读取,类似于断点续传。

这里面有两个比较重要的过程: 状态保存: 通过 Reader 把未读取的 split 信息存到 state 里,引擎在读取过程会定期对 state 做快照,如 snapshotState 方法的逻辑;

状态恢复:Reader 的状态恢复主要是通过最后一次快照,进行恢复后继续读取。在开始读取数据的时候,从未读取的分片集合中里面去消费,之后开始实际读取,对应 pollNext 方法逻辑。

StarRocks Connector 写入原理

介绍完 Source Connector 的写入原理,我们再来看 Sink 连接器的写入原理。

StarRocks Sink 写入是基于 Stream Load 接口,在写入时需要做处理批量和重试。对于批量,数据是在写入之后,先缓存在内存中,达到一定阈值之后再进行批量数据的提交。

阈值目前包括批数据的大小、数据条数限制,同时连接器也支持定时提交,一定时间间隔下提交一次。

📢注意,在 sink 的时候,需要留意"too many tablet versions" 报错,出现问题是由于导入频率太快,数据没能及时合并(Compaction),从而导致版本数超过支持的最大未合并版本数。

除了优化 BE 的配置,调整合并策略,如 cumulative_compaction_num_threads_per_disk、base_compaction_num_threads_per_disk 等来加快合并,也可以在 sink 端控制批量的提交的阈值,增大单次导入的数据量,降低导入频率。

对于重试,SeaTunnel 支持配置重试策略,如重试次数,等待间隔与最大重试次数等。

01 CDC 数据写入支持

目前,SeaTunnel 已支持数据库变更数据捕获(CDC https://github.com/apache/incubator-seatunnel/issues/3175),以将数据更改实时传送到下游系统。SeaTunnel 将捕获到的数据更改分为以下 4 种类型:

  • INSERT(数据插入)

  • UPDATE_BEFORE(数据更改前的旧值)

  • UPDATE_AFTER(数据更改后的新值)

  • DELETE(数据删除)

在写入的目标数据上面,StarRocks 数据源的主键模型支持通过 Stream Load 导入作业,同时对 StarRocks 表进行数据变更,包括插入、更新和删除数据。

因此,将 SeaTunnel changelog 数据的变更类型转换成 StarRocks 支持的变更类型,使 SeaTunnel Connector 可以支持 CDC 写入 StarRocks。

在 SeaTunnel 中假如 CDC 数据如上图图所示,分别插入主键为 1\2\3 的数据,对主键 1 进行 UPDATE,会生成 update_before、update_after 、dedete 的 cdc changelog event,通过 sink 连接器配置中 enable_upsert_delete = true,开启将 CDC 数据写入 StarRocks 的支持。

StarRocks Connector 使用示例

这里以 StarRocks 之间同步数据这个使用场景为例,介绍如何配置使用连接器。假设在 StarRocks 有一张数据表 customer_1,有四个数据列,我们目标将数据同步到一个张表 customer_2,首先在 SeaTunnel 任务配置文件中配置 Source Connector,数据表有 4 个字段列,我们只需要 2 个字段,所以配置数据投影。

在 Transform Connector 配置中我们进一步进行数据处理,希望将 c_name 字段中 customer前缀去除,保留数字部分同时导入数据字段名称跟目标表名称表不一致,需要通过 SQL 重命名。

最后配置 Sink Connector,配置目标数据源的链接信息,指定 Stream Load 数据导入的文件格式为 JSON。

在 env 里面对任务参数进行指定,如任务的整体并行度,当然也可以在 connector 的配置里面单独指定并行度。

最终导入到目标表的 customer_2,如下图:

连接器后续规划

至此,我们可以看到,SeaTunnel 的基本数据同步功能已经非常完善了,但一些数据同步场景对数据可靠性有着更高的要求,在 Sink 侧需要有仅一次和至少一次的语义支持,这两点已经在社区的支持计划中了。

其中对于 Exactly-Once 语义,StarRocks 2.4 版本提供了 Stream Load 事务接口,为实现高效导入同时兼顾 Exactly-Once 提供了实现的基础。

另外,SeaTunnel 社区还计划在 Source 和 Sink 连接器中支持更多的数据类型映射,如 BITMAP、HLL、Array 等,丰富连接器的功能。

最后,也欢迎 StarRocks 的朋友们一起来为 StarRocks 添砖加瓦,促进生态的融合,让大数据处理回归简单!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/612495.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring为什么默认是单例的?

目录 一、五种作用域 二、单例bean与原型bean的区别 三、单例Bean的优势与劣势 一、五种作用域 1.singleton: singleton是Spring Bean的默认作用域&#xff0c;也就是单例模式。在整个应用程序中&#xff0c;只会创建一个实例&#xff0c;Bean的所有请求都会共享这个实例。 …

ETLCloud轻松应对CDC实时数据流和维度数据合并的需求,实时监控订单数据

如何实现实时流与批流合并打宽数据 通常情况下我们使用CDC实时监听表销售或订单表数据的LOG时会形成流式的数据&#xff0c;即订单变化时数据是按照变化时间不断的传入到ETL的流程中的&#xff0c;业务希望实时看到订单数据的报表。 CDC每次传入的数据有可能是一条也可能是多…

基于geoserver开发地图发布服务

写在前面&#xff1a;我在github上创建了对应的项目&#xff0c;可点此跳转&#xff0c;本文的所有源码均可在项目里找到&#xff0c;欢迎大家访问交流 一、开发背景 在gis领域&#xff0c;geoserver是后端地图发布的开源项目。目前我们在启动服务后&#xff0c;可通过自带的…

科研工具-R-META分析与【文献计量分析、贝叶斯、机器学习等】多技术融合实践

Meta分析是针对某一科研问题&#xff0c;根据明确的搜索策略、选择筛选文献标准、采用严格的评价方法&#xff0c;对来源不同的研究成果进行收集、合并及定量统计分析的方法&#xff0c;最早出现于“循证医学”&#xff0c;现已广泛应用于农林生态&#xff0c;资源环境等方面。…

【AIGC】14、GLIPv2 | 在 GLIP 上扩展 negative phrase 并新增分割功能

文章目录 一、背景二、方法2.1 A Unified VL Formulation and Architecture2.2 GLIPv2 pre-training2.3 将 GLIPv2 迁移到 Localization 和 VL task 三、结果3.1 One model architecture for all3.2 One set of model parameters for all3.3 GLIPv2 as a strong few-shot learn…

Latex使用algorithm2e包写伪代码

用Latex写伪代码我们需要用到一个包&#xff0c;Algorithm2e&#xff0c;这个工具包的使用手册下载地址为&#xff08;http://mlg.ulb.ac.be/files/algorithm2e.pdf&#xff09;CSDN的链接为&#xff08;&#xff09; 准备 导入该包 \usepackage[ruled,linesnumbered]{algor…

上海亚商投顾:沪指小幅震荡微涨 AI应用端持续活跃

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 市场情绪 大小指数今日走势分化&#xff0c;沪指全天窄幅震荡&#xff0c;创业板指低开低走&#xff0c;盘中一度跌超1.6%&a…

【Java基础】I/O流 —— Java中的流都需要关闭吗?

目录 一、为什么要关闭流&#xff1f;二、close方法和flush方法1.使用close方法2.使用flush方法 三、流按指向分类四、不用关闭的流 一、为什么要关闭流&#xff1f; 涉及到对外部资源的读写操作&#xff0c;包括网络、硬盘等等的I/O流&#xff0c;如果在使用完毕之后不关闭&a…

Unity基础框架从0到1(六)对象池模块

索引 这是Unity基础框架从0到1的第六篇文章&#xff0c;框架系列的项目地址是&#xff1a;https://github.com/tang-xiaolong/SimpleGameFramework 文章最后有目前框架系列的思维导图&#xff0c;前面的文章和对应的视频我一起列到这里&#xff1a; 文章 Unity基础框架从0到…

算力不竭如江海,天翼云“息壤”如何助力千行百业算力智能调度?

科技云报道原创。 数字时代下&#xff0c;算力已成为新型生产力&#xff0c;并朝着多元泛在、安全可靠、绿色低碳的方向演进。以算力为核心的数字信息基础设施&#xff0c;是国家战略性布局的关键组成部分&#xff0c;也成为数字经济时代的“大国重器”。 作为云服务国家队&am…

报表生成器FastReport .Net教程:“Text“对象、文本编辑

FastReport .Net是一款全功能的Windows Forms、ASP.NET和MVC报表分析解决方案&#xff0c;使用FastReport .NET可以创建独立于应用程序的.NET报表&#xff0c;同时FastReport .Net支持中文、英语等14种语言&#xff0c;可以让你的产品保证真正的国际性。 FastReport.NET官方版…

es elasticsearch 十四 各种机制 评分机制 正序索引 解决跳跃结果问题 解决耗时过长问题 解决相同属性值都到一个地方

目录 评分机制 机制 查看评分实现如何算出来的explaintrue 分析能否被搜索到 Doc value 正排序索引 Query phase Fetch phase Preference 问题 解决跳跃结果问题 Timeout 到达时间直接返回&#xff0c;解决耗时过长问题 Routing 数据准确分配到某地&#xff0c;解决相…

这才叫软件测试工程师,你那最多是混口饭吃罢了....

前些天和大学室友小聚了一下&#xff0c;喝酒喝大发了&#xff0c;谈天谈地谈人生理想&#xff0c;也谈到了我们各自的发展&#xff0c;感触颇多。曾经找工作我迷茫过、徘徊不&#xff0c;毕业那会我屡屡面试失败&#xff0c;处处碰壁&#xff1b;工作两年后我一度想要升职加薪…

006+limou+C语言“堆的实现”与“树的相关概念”

0.前言 这里是limou3434的一篇个人博文&#xff0c;感兴趣可以看看我的其他内容。本次我给您带来的是树的相关只是&#xff0c;并且把堆这一数据结构做了实现&#xff0c;后面还有大量的oj题目。但是树重点也就在这十多道oj题目中&#xff0c;您可以尝试着自己做一下&#xff…

我的创作纪念日|写在CSDN创作第512天

机缘 今天无意中发现CSDN后台给我发送私信&#xff0c;才发觉原来我的第一篇博客更新已经过去512天了&#xff0c;512天一晃而过居然还有点恍然。 作为一名网络专业的在校大学生&#xff0c;最初开始查找相关的资料其实更习惯于从外站进行查找&#xff0c;却总是在不经意中进入…

人事管理项目-前端实现

人事管理项目-前端实现 引入Element和Axios开发Login页面配置路由配置请求转发启动前端项目 引入Element和Axios 前端UI使用Element&#xff0c;网络请求则使用Axios&#xff0c;因此首先安装Element和Axios依赖&#xff0c;代码如下&#xff1a; 依赖添加成功后&#xff0c;接…

N-propargyloxycarbonyl-L-lysine,1215204-46-8,是一种基于赖氨酸的非天然氨基酸 (UAA)

产品描述&#xff1a; N-ε-propargyloxycarbonyl-L-lysine (H-L-Lys(Poc)-OH) 是一种基于赖氨酸的非天然氨基酸 (UAA)。 广泛用于多种生物体中荧光探针的生物偶联。 N- ε- Propargyloxycarbonyl-L-lysine (H-L-Lys (Poc) - OH) is a non natural amino acid (UAA) based on …

Kotlin Channel系列(一)之读懂Channel每一行源码

文章目录 有话说概述初识ChannelChannel种类Channel五大金刚SendReceiveClosedQueueBuffer Channel的行为Channel源码分析发送数据大动脉接收数据大动脉父类默认实现方式(RendezvousChannel)发送流程send()函数onSend()函数 接收流程receiveCatching()函数onReceiveCatching()函…

基于图像处理的圆检测与深度学习

基于图像处理的圆检测与深度学习 摘 要一、 绪论二 、图像预处理2.1 滤波算法2.2 边缘检测 三 、圆识别与定位算法3.2 定位算法3.2.1 迭代算法 4.1 数据处理 五、深度学习介绍&#xff1a;参考文献 摘 要 本文主要论述在图像处理的的基础上&#xff0c;为了克服图像背景中的亮…

SpringBootWeb案例-2(下)

3. 修改员工 需求&#xff1a;修改员工信息 在进行修改员工信息的时候&#xff0c;我们首先先要根据员工的ID查询员工的信息用于页面回显展示&#xff0c;然后用户修改员工数据之后&#xff0c;点击保存按钮&#xff0c;就可以将修改的数据提交到服务端&#xff0c;保存到数据…