使用Apache SeaTunnel进行二次开发的实践分享

news2024/9/28 17:29:55

大家好,我是范佳,是Apache SeaTunnel社区的PMC member。今天给大家分享一些基于Apache SeaTunnel二次开发的内容。

file

这部分内容主要涉及代码层面的知识,如果大家有什么疑问,欢迎来社区找我交流!

引言

大部分数据开发工程师基于Apache SeaTunnel的二次开发,可能做的就是任务提交,任务的一些监控,还有在任务没有跑起来之前,我们可能需要预先知道跑起来之后的可能一些结果。

基于以上内容,我将从五个部分来分享相关内容:

  1. SeaTunnel介绍
  2. SeaTunnel的启动能力
  3. 任务监控的定制化
  4. 行为预览与Sink的关联

Apache SeaTunnel 是一个高性能的实时和离线数据批处理平台,自加入Apache软件基金会以来已有两年多时间,期间社区不断发展,增加了许多新功能和特性。

感兴趣的同学可以访问官网:https://seatunnel.apache.org/

Apache SeaTunnel的核心特性

多引擎支持

SeaTunnel支持多种数据处理引擎,包括市场上流行的开源引擎如Spark和Flink,以及SeaTunnel自研的Zeta引擎。这使得SeaTunnel能够灵活应对不同的数据处理需求,无论是大规模数据集还是实时数据流。

海量连接器

项目提供了广泛的连接器支持,使得SeaTunnel可以轻松接入各种数据源和目的地,从而简化了数据集成过程。这一特性对于需要将数据从多个源汇总到单一系统的企业尤为重要。

HTTP支持

对HTTP的支持是SeaTunnel的又一亮点,特别是对于开发者来说,因为它可以显著降低适配成本。通过HTTP支持,开发者可以更容易地将SeaTunnel集成到现有的Web应用和服务中。

流批一体

SeaTunnel的流批一体功能确保了无缝的数据处理,无论是流处理还是批处理,都能在同一个平台上高效运行。这一特性简化了架构,减少了维护的复杂性。

流速控制

作为一个数据同步引擎,SeaTunnel提供了流速控制功能,这对保护下游系统不被过载非常关键。尤其是在上游数据量大而下游系统承载能力有限的场景中,流速控制显得尤为重要。

自动建表

自动建表功能可以极大地帮助简化数据处理流程,特别是对下游系统来说。这一功能允许SeaTunnel根据数据自动创建表结构,减少了手动介入的需要,提高了数据管道的灵活性和效率。

SeaTunnel的启动

一般来讲,我们基于开源软件二开,第一步就是启动,而启动SeaTunnel任务的第一步是准备用户界面,确保二开后的用户可以通过界面触发或定时提交任务。

一旦用户界面设置完成,以下是使用Shell脚本提交任务的基本步骤:

编写Shell脚本:创建一个Shell脚本,用于封装启动命令和任务参数。

file

执行命令:通过执行Shell脚本来提交任务到SeaTunnel引擎。

Job ID的获取与管理

在任务提交时,我们的引擎会返回一个任务ID。

这个ID在使用脚本模式启动时只会打印在日志文件中。如果需要监控任务,需要解析日志文件以获取任务ID。

然而,这种方式比较滞后,因为ID是引擎端生成的,可能需要等待一段时间才能得到。

为了解决这个问题,我们新增了一个功能,允许在提交任务时配置自定义ID。这个ID可以由第三方服务或集成SeaTunnel的平台生成,然后传递给SeaTunnel,SeaTunnel会使用该ID作为任务的唯一标识。

这项功能虽然小,但对于二次开发或集成非常有用,避免了解析日志或等待SeaTunnel生成ID的过程。

提交任务的三种方式

Shell模式

通过Shell脚本启动任务时,可以在日志文件中获取任务ID。

REST API模式

我们也支持通过HTTP提交任务。这种方式无需额外启动客户端,对第三方集成更加友好。

file

HTTP提交任务的方式更加自然和通用。

SeaTunnel Client 模式

对于更深度、精细化和功能更强大的任务提交方式,推荐使用SeaTunnel Client。

file

SeaTunnel Client是一个核心类,通过它可以提交所有任务。无论是引擎内部代码还是外部集成代码,都可以使用这个Client提交任务到集群。

通过SeaTunnel Client,我们可以在JVM进程中直接提交任务。例如,在一个Spring服务中,用户点击启动按钮后,后端可以直接使用SeaTunnel Client提交任务,而不需要启动一个额外的HTTP或Java进程。

这种方式的好处包括:

  • 及时响应任务状态:任务的状态可以通过回调机制及时返回客户端。
  • 异步操作:任务执行时,客户端会返回一个Future,可以通过Future进行异步操作。例如,任务结束时注册回调函数来处理业务逻辑。
  • 深度集成:这种方式使SeaTunnel与二次开发的平台集成更加深度和紧密。

任务监控

启动任务后,我们需要对SeaTunnel进行监控,以了解任务的状态。

例如,任务是否启动成功?运行了多久?数据是否成功读取?任务是否失败?失败的原因是什么?这些都是二次开发时需要关注的内容,因为我们不能保证所有任务都能正常运行。

监控任务状态

我们可以通过以下三种方式监控任务状态:

  1. Shell脚本:通过Shell脚本查看所有任务的状态,例如任务是正在运行、失败还是完成。
  2. HTTP:通过HTTP接口获取任务状态,例如任务是否失败以及失败原因。
  3. SeaTunnel Client:SeaTunnel Client不仅可以提交和取消任务,还可以查询任务状态。

比如说下面的截图,这个就是一个job result

file 然后这个 job result 也是我们SeaTunnel client 返给我们的,然后我们就可以看到里面的状态。

file 如何调用SeaTunnel Client?

传入任务ID即可获取任务状态,任务是正在运行还是失败。对于集成开发来说,获取任务异常信息非常重要。如果通过Shell脚本查看日志,用户需要手动解析日志文件。

这在集成的Web页面中并不方便。因此,我们推荐通过HTTP或SeaTunnel Client获取异常信息。

指标监控(Metrics)

除了监控任务状态之外,我们还需要有指标。

例如,任务虽然在运行,但它是否真正读取到了数据?读取了多少数据?写入了多少数据?吞吐量是多少?这些都是需要关注的指标。

SeaTunnel引擎内部提供了对应的指标获取方式,有以下三种方式: 1.Shell脚本:通过Shell脚本可以查询任务的各项指标。 2.HTTP:通过HTTP接口可以获取任务的各项指标。 3.SeaTunnel Client:通过SeaTunnel Client可以查询任务的各项指标。

核心监控指标

我们可以监控的核心指标包括:

●读取数量 ●读取的字节数 ●QPS(每秒查询率) ●每秒字节数 ●写入数量 ●写入的字节数

对于CDC(Change Data Capture),我们比较关心的是CDC的延迟,即从CDC源端的数据产生到SeaTunnel读取到它的延迟是多少。

目前,我们的支持是每个任务级别的,但对于每个任务中的每张表的支持还比较弱,因为SeaTunnel支持多表任务,即一个任务可以读取和写入多张表。我们正在改进这方面的支持。

指标暴露

除了查询指标外,我们还可以将指标定时对外暴露,例如暴露到Prometheus或SeaTunnel的指标体系中。

目前,SeaTunnel对这方面的支持还比较弱,但我们希望在未来能更好地支持将指标对外抛出到第三方组件,如Prometheus,这样对用户会更友好。

定制化指标

我们提供的默认指标可能不能满足所有用户或开发者的需求。那么,如何定制属于内部系统或二次开发系统的指标呢?

file

定制化指标集成实际上很简单。可以通过我们的context对象来实现。这个context对象包含一个MetricsContext对象,我们可以向其中注册自定义指标。

file

  1. 定义指标名称:这是一个字符串,代表指标的名称。
  2. 注册指标:通过MetricsContext对象注册自定义指标。
  3. 更新指标值:当需要更新指标值时,通过调用提供的方法将值更新到对象中。

这样就完成了定制化指标的集成,通过这种方式注册的自定义指标,可以通过HTTP、Shell脚本或SeaTunnel Client查询和展示。

事件系统

除了指标外,如果需要一些瞬发性的事件处理,例如在某些事件发生时收到通知,可以使用SeaTunnel内部设计的事件系统。

file

事件示例

SeaTunnel的事件系统可以处理以下事件:

  • Reader打开和关闭事件
  • Task打开和关闭事件
  • 自定义事件 当这些事件发生时,SeaTunnel会将事件汇总,并发送到Master节点进行处理。

file

DDL 事件

后续我们会实现DDL事件的发送功能。社区正在开发的DDL功能主要是为了应对schema变化,例如在MySQL CDC运行过程中,schema发生变化会产生DDL事件。

我们可以将这些DDL事件包装成对应的事件发送出去。外部系统可以接收到这些事件,比如某个表增加了一列或删除了一列,然后进行相应的展示或处理。这是事件系统的作用。

自定义事件

就像我们可以自定义metrics一样,事件也可以自定义。自定义事件的方式与metrics非常相似。用户可以实现自己的事件来处理特定业务需求。

file

自定义事件的实现

在metrics中,可以通过context对象获取MetricsContext。同样地,在事件系统中,我们可以获取EventListener,然后通过它注册和处理自定义事件。

我们提供了对应的接口EventHandler,它是一个SPI实现。用户可以实现自己的handler,然后将其放到lib目录下,或者打包到应用中。

file

有了这个handler之后,Master节点会发现所有的EventHandler,并调用它们的handle方法。具体的事件处理逻辑由实现的handler决定。

默认事件处理实现

我们内部提供了一个默认的实现:JobEventHttpReporterHandler。这个handler会将事件通过HTTP接口发送到用户配置的地址。

file

用户可以通过这个接口接收引擎中的事件,例如任务开始、任务结束、数据到达等。

事件系统不仅用于捕获运行时的事件,还可以用于DDL事件。例如,MySQL CDC运行过程中,schema变化会产生DDL事件。我们可以将这些DDL事件包装并发送出去,外部系统可以接收到这些事件并进行相应处理,例如展示schema变化、执行后续操作等。

集群节点健康状况监控

除了任务级别的监控,我们还需要关注集群节点的健康状况。作为一个集群系统,了解整个集群是否正常运作非常重要。这些信息可以通过SeaTunnel Client获取。

file

获取集群健康信息

通过SeaTunnel Client,我们可以获取到集群的一些健康信息。这些信息包括但不限于:

  • 内存使用情况
  • GC次数
  • RPC操作延迟
  • RPC请求次数

这些与性能和集群稳定性相关的信息能够帮助我们更好地监控和维护系统。

例如,我们可以通过SeaTunnel Client获取集群节点的健康状况,并在页面上展示出来。如果在3个节点的集群中,只有2个节点正常,我们可以通过接口判断并处理异常节点。

SaveMode

SaveMode与Sink密切相关,决定了在写入数据之前执行的一系列操作。这些操作包括自动建表、表重建、数据清空或数据追加。

通过配置schema_save_mode和data_save_mode,可以定义这些行为。

SaveMode配置示例
  • schema_save_mode:定义如何处理schema。例如,是否重新创建schema,或者在不存在时创建。
  • data_save_mode:定义如何处理数据。例如,是否清空数据,或者追加数据。

行为预览

我们预览的核心是 SaveMode 到底会怎么操作。这一块是纯代码层面,如果要集成的话,肯定需要写代码。虽然不像 HTTP 那么简单,但它非常有用。

例如,我现在任务还没开始跑,或者即将定时运行。我想知道在配置了表重建的情况下,任务到底会创建表还是不会创建表。在任务运行前,我们可以通过行为预览确定 SaveMode 和 data SaveMode 的行为。这对于涉及表操作的情况尤其重要,因为表操作可能比较敏感。

数据结构的变化

比如说我们从 source 端读取的是 MySQL 的表,MySQL 表在二次开发中可能会涉及到一个 CatalogTable。

file

我们会将外部系统的表抽象成内部统一的 CatalogTable。例如,从 MySQL 读取一张表,然后转换成系统内部的 CatalogTable。

file

表结构预览

如果任务配置读取表 a,我们可以通过页面上的一些操作,预览表 a 的输出结构。

file

具体步骤如下:

  1. 获取 TableSourceFactory。
  2. 将配置传入,构建 TableSource。
  3. 通过 source 提供的方法获取 productCatalogTable。

file

这种预览在任务还没有真正跑起来时就可以执行,确保任务读取的表结构是正确的。

Transform 预览

例如,我们有 SQL 作为 transform 操作,希望在 SQL 中改一个字段的类型,同时增加和删除一些字段。

file

预览功能可以在任务运行前确认这些操作是否会如期执行。

具体步骤如下:

  1. 获取 TransformFactory,构建 transform。
  2. 将配置传入,并传入 source 端生成的 CatalogTable。
  3. 获取 transform 输出的表结构,确认 transform 操作是否正确。

Sink 端操作预览

从 transform 输出的表结构,需要传入 Sink 进行写入操作。涉及到自动建表时,我们可以通过 SaveModeHandler 确认以下内容:

  1. Sink 是否需要建表?
  2. 表名是什么?
  3. 字段有哪些?

SaveMode handler 会根据 schema_save_mode 和 data_save_mode 配置,以及 catalog 中的表判断是否需要建表。

SaveMode Handler 的作用

当我们具体操作Catalog,比如说Catalog 有一个 exist 的方法去判断我们的 table pass ,根据我们的 schema_save_mode, data_save_mode 去判断我们的接下来的这一块的行为到底是什么?

SaveMode Handler 提供了能力,例如:

  • 判断当前的 schema_save_mode 和 data_save_mode 配置。
  • 处理表的路径(TablePath)。
  • 调用 catalog 提供的方法判断表是否存在。

通过 SaveModeHandler 提供的能力,可以预览和确认任务在运行时是否会创建表或进行其他操作。

执行 SQL 预览

我们执行 SQL 时,可以提前看到将要建表的 SQL。例如:

  • 对于某些用户来说,SQL 可能比较敏感。
  • 预先知道 SQL 是否合理,如果不合理,可以在基础上修改后手动创建。

我们提供了 Catalog 预览功能,调用 preAction 方法可以预览建表或删除表的 SQL。

在建表时,输出表结构的类型非常重要。我们需要知道内存中看到的类型在自动建表时会被建成什么类型。

为此,SeaTunnel 内部有一套叫 TypeConverter 的接口体系。

TypeConverter 的作用
  1. 类型转换:将 SeaTunnel 的类型转换成数据库的类型。
  2. 反向转换:读取表时,将数据库的类型转换成 SeaTunnel 的类型。

通过 TypeConverter,我们可以预览并确认 SeaTunnel 和数据库之间的字段类型交互。例如,通过转换和反向转换,我们可以知道表字段类型在 SeaTunnel 和数据库之间的具体表现。

类型转换(TypeConverter)

在行为预览中,我们可以通过 TypeConverter 接口体系实现类型转换的预览。预览与实际运行时的转换结果一致,因为实际运行中也是通过这套代码进行类型转换。

示例

通过集成 Type Converter 接口,我们可以在预览时确认建表的具体类型。

file

例如:

  • 将 SeaTunnel 类型转换为数据库类型。
  • 读取表字段时,确认数据库字段类型在 SeaTunnel 中的表现。

总结

今天给大家主要分享了以下内容:

  1. 多种启动方式:包括三种主要的启动方式。
  2. 指标获取:如何获取指标信息。
  3. 错误信息获取:如何获取并处理错误信息。
  4. 自定义指标:如何创建和获取自定义指标。
  5. 事件系统:如何创建、读取和处理事件,包括自定义事件和现有事件的读取。
  6. 系统相关信息获取:如何获取集群和节点的健康状况等信息。
  7. 数据结构预览:如何预览数据结构、建表 SQL 和外部系统的交互类型。

SeaTunnel 内部已经实现了许多功能,通过集成这些功能,可以实现更高效、更兼容的二次开发。

希望这些接口和设计能让大家在集成和二次开发时更加简单和高效,欢迎大家基于这些标准化体系进行扩展,并将实现的功能回馈给社区,使 SeaTunnel 更加丰富和强大。

通过本文的分享,能够帮助大家对 SeaTunnel 的二次开发有更深入的了解。如果大家有任何问题,欢迎随时与我交流。谢谢大家!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1948975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

谷粒商城实战笔记-62-商品服务-API-品牌管理-OSS整合测试

文章目录 一,Java中上传文件到阿里云OSS1,整合阿里云OSS2,测试上传文件 二,Java中整合阿里云OSS服务指南引言准备工作1. 注册阿里云账号2. 获取Access Key3. 添加依赖 实现OSS客户端1. 初始化OSSClient2. 创建Bucket3. 上传文件4.…

Redis的五种数据类型与命令

目录 引言 一 Redis的特性 二 Redis的安装 三 Redis的优点 四 Redis的五种数据类型与命令 五 Redis的配置文件 引言 Redis是什么? Remote Dictionary Service(远程字典服务器) Redis 是一个开源的(BSD许可)的,C语言编写的,高性能的数…

Windows电脑如何启动RTSP服务实现本地摄像头数据共享

技术背景 提起Windows共享本地摄像头,好多人想到的是通过ffmepg或vlc串流到服务器,实际上,用轻量级RTSP服务更简单,本文就介绍下,如何用大牛直播SDK的Windows轻量级RTSP服务,采集摄像头,生成本…

记录使用el-form的resetFields时遇到的表单数据回显失败的问题,去除nextTick解决

首先简单介绍一下resetFields的基础作用 element-plus官网el-form介绍 本案例中实现点击每行的编辑按钮时,弹出弹窗和表单 由于设置了表单校验,如图,表单内容不符合设定的校验规则时会有提示 如果仅仅这样就会出现问题,下次打…

分布式搜索引擎ES--Elasticsearch集群

1.Elasticsearch集群的概念 分片机制:每个索引都可以被分片 索引my_doc只有一个主分片;索引shop有三个主分片;索引shop2有5个主分片;(参考前面案例) 每个主分片都包含索引的数据,由于目前是单机,所以副分片是没有的&a…

PyQt ERROR:ModuleNotFoundError: No module named ‘matplotlib‘

Solution:打开cmd输入指令下载malplotlib pip install matplotlib

【Docker】CentOS7环境下的安装

环境展示 安装 配置仓库 sudo yum install -y yum-utils # docker官方key文件下载 sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo # 建议使用阿里云key文件下载 sudo yum-config-manager --add-repo https://mirrors.aliyun.…

KubeSphere介绍及一键安装k8s

KubeSphere介绍 官网地址:https://kubesphere.io/zh/ KubeSphere愿景是打造一个以 Kubernetes 为内核的云原生分布式操作系统,它的架构可以非常方便地使第三方应用与云原生生态组件进行即插即用(plug-and-play)的集成&#xff0…

【Android】数据存储方案——文件存储、SharedPreferences、SQLite数据库用法总结

文章目录 文件存储存储到文件读取文件 SharedPreferences存储存储获取SharedPreferences对象Context 类的 getSharedPreferences() 方法Activity 类的 getPreferences() 方法PreferenceManager 类中的 getDefaultSharedPreferences() 方法 示例 读取记住密码的功能 SQLite数据库…

【解决方案】华普微汽车智能钥匙解决方案

一、方案概述 1.什么是被动式无钥匙进入 "被动式无钥匙进入"(Passive Keyless Entry)是一种用于车辆、建筑物或其他设施的访问控制系统。它利用无线射频技术自动判断用户是否接近,并进行身份识别以执行开锁或落锁动作&#xff0c…

【Unity2D 2022:UI】TextMeshPro组件无法显示中文

在Unity中创建了一个预制体Card,上面挂载了一些Text Mesh Pro组件用来显示卡牌信息。但是在输入文字后,发现无法显示中文: 解决方法如下: 一、导入字体文件(ttf格式)和常用字字集(txt格式&…

leetcode日记(51)不同路径Ⅱ

和上一道题(无障碍物的最短路径)很像,但事实上比上一题多了优化方法 根据上一题改的代码如下,添加了对障碍物的判定,如果有障碍物则将数组值设为0。 class Solution { public:int uniquePathsWithObstacles(vector&l…

理发店收银管理系统 python、sqlite3、pyqt5

给姐姐家店写的一个,功能比较简单,结合gpt 功能包含:次卡和充值卡,可以查剩余次数、以及查找消费记录 后期会把sqlite3转到mysql,换成springboot的一个项目 1.使用技术: Python、sqlite3、PyQt5 2.页面 …

快手可灵视频生成大模型全方位测评

快手视频生成大模型“可灵”(Kling),是全球首个真正用户可用的视频生成大模型,自面世以来,凭借其无与伦比的视频生成效果,在全球范围内赢得了用户的热烈追捧与高度评价。截至目前,申请体验其内测…

如何使用C#自制一个Windows安装包

原文链接:https://www.cnblogs.com/zhaotianff/p/17387496.html 以前都在用InstallShield制作安装包,基本需求是能满足的,但也有一些缺点: 1、界面不能完全定制 2、不能直接调用代码里的功能 平常使用一些其它软件,…

ETL数据集成丨将PostgreSQL数据库数据实时同步至PostgreSQL

前言 我们在进行数据集成、实时数据同步中,经常会出现在同一个数据库中做数据同步和复制、实时分析和报告、负载均衡和高可用性等场景,这次我们以PostgreSQL为例,通过ETLCloud工具,进行同数据库中数据实时同步的步骤应该如何设置…

Anconda 快速常用命令简洁版

目的:简单清楚的使用基本的conda 命令 可能需求 查看项目中的虚拟环境及依赖是否满足需求操作新环境来满足项目或者论文的实现 Anconda 常用命令 conda 查看基础命令1. 进入Anaconda 环境2. 查看版本3.查看有哪些虚拟环境4.激活虚拟环境5. 进入虚拟环境查看6. 退出…

shopee虾皮 java后端 一面面经 整体感觉不难

面试总结:总体不难,算法题脑抽了只过了一半,面试官点出了问题说时间到了,反问一点点,感觉五五开,许愿一个二面 1.Java中的锁机制,什么是可重入锁 Java中的机制主要包括 synchronized关键字 Loc…

微信小程序之计算器

在日常生活中,计算器是人们广泛使用的工具,可以帮助我们快速且方便地计算金额、成本、利润等。下面将会讲解如何开发一个“计算器”微信小程序。 一、开发思路 1、界面和功能 “计算器”微信小程序的页面效果如图所示 在计算器中可以进行整数和小数的…

NET8部署Kestrel服务HTTPS深入解读TLS协议之Certificate证书

Certificate证书 Certificate称为数字证书。数字证书是一种证明身份的电子凭证,它包含一个公钥和一些身份信息,用于验证数字签名和加密通信。数字证书在网络通信、电子签名、认证授权等场景中都有广泛应用。其特征如下: 由权威机构颁发&…