Seatunnel本地模式快速测验

news2025/1/23 2:23:29

前言

SeaTunnel(先前称为WaterDrop)是一个分布式、高性能、易于扩展的数据集成平台,旨在实现海量数据的同步和转换。它支持多种数据处理引擎,包括Apache Spark和Apache Flink,并在某个版本中引入了自主研发的Zeta引擎。SeaTunnel不仅适用于离线数据同步,还能支持CDC(Change Data Capture)实时数据同步,这使得它在处理多样化数据集成场景时表现出色。

本节内容作为官方的一个补充测验,快速开始体验吧。


一、Apache Seatunnel是什么?

从官网的介绍看:
Next-generation high-performance, distributed, massive data integration tool.
通过这几个关键词你能看到它的定位:下一代,高性能,分布式,大规模数据集成工具。

那到底好不好用呢?

二、安装

  1. 下载
https://seatunnel.apache.org/download

推荐:v2.3.5

  1. 安装
    环境:Java8
    安装插件:修改 config/plugin_config 以下是一些常用的,不要的暂时不装。
--connectors-v2--
connector-cdc-mysql
connector-clickhouse
connector-file-local
connector-hive
connector-jdbc
connector-kafka
connector-redis
connector-doris
connector-fake
connector-console
connector-elasticsearch
--end--

然后执行:
➜ seatunnel bin/install-plugin.sh 2.3.5
等待执行完毕,就安装完了,很简单。

三、测试

1. 测试 local模式下的用例

修改下模板的测试用例,然后执行如下命令:

bin/seatunnel.sh --config ./config/v2.batch.config -e local

任务的配置很简单:
这里使用了FakeSource来模拟输出两列,通过设置并行度=2 来打印 16 条输出数据。
2024-07-01 21:56:06,617 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file:
{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "BATCH",
        "checkpoint.interval" : 10000
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "row.num" : 16,
            "parallelism" : 2,
            "result_table_name" : "fake",
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

 任务的输出信息,这里的输出组件是 Console所以打印到了控制台
2024-07-01 21:56:07,559 INFO  [o.a.s.c.s.f.s.FakeSourceReader] [BlockingWorker-TaskGroupLocation{jobId=860156818549112833, pipelineId=1, taskGroupId=30000}] - Closed the bounded fake source
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : hECbG, 520364021
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : LnGDW, 105727523
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : UYXBT, 1212484110
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : NYiCn, 1208734703
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : cSZan, 151817804
任务的统计信息:
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-07-01 21:56:06
End Time                  : 2024-07-01 21:56:08
Total Time(s)             :                   2
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

2. 使用 Flink引擎

在上面的测试用例中可以看到如下的日志输出:

 Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='FakeSource'

这表示默认情况下它使用的是 seatunnel engine 执行的,官方称之为 zeta 。 这一块内容我们先看下 Flink引擎这边是如何执行的。

  1. 下载安装 flink1.17
    https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/

    启动local cluster 模式

    ➜  flink bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host MacBook-Pro-2.local.
    Starting taskexecutor daemon on host MacBook-Pro-2.local.
    
  2. 配置环境变量

    ➜  config cat seatunnel-env.sh
    # Home directory of spark distribution.
    SPARK_HOME=${SPARK_HOME:-/Users/mac/apps/spark}
    # Home directory of flink distribution.
    FLINK_HOME=${FLINK_HOME:-/Users/mac/apps/flink}
    
  3. 修改slot插槽数量为大于等于 2
    为什么?因为默认的配置中配置了 2 个并行度,而 local启动的默认情况下只有个插槽可供使用,因此任务无法运行。
    在这里插入图片描述
    默认启动后资源插槽:
    在这里插入图片描述
    提交程序运行后,发现一直无法对 sourcez做任务切分:
    在这里插入图片描述

    这是因为 job 的并行度是 2,如下所示:
    在这里插入图片描述

    在这里插入图片描述

    因此需要修改插槽数量才可以运行,官方这点可没说清楚,需要注意下。

  4. 运行测试用例

    ➜  seatunnel bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
    Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-flink-15-starter.jar --config ./config/v2.streaming.conf.template --name SeaTunnel
    Job has been submitted with JobID 9a949409a6f218d50b66ca22cc49b9c4
    

    现在我们修改插槽数量为 2,测试如下:
    访问:http://localhost:8081/#/overview
    在这里插入图片描述
    TaskManager输出日志如下:
    在这里插入图片描述

3. 使用 Spark引擎

  1. 提交命令
➜  seatunnel bin/start-seatunnel-spark-3-connector-v2.sh \
--master 'local[4]' \
--deploy-mode client \
--config ./config/v2.streaming.conf.template

Execute SeaTunnel Spark Job: ${SPARK_HOME}/bin/spark-submit --class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" --master "local[4]" --deploy-mode "client" --jars "/Users/mac/server/seatunnel/lib/seatunnel-transforms-v2.jar,/Users/mac/server/seatunnel/lib/seatunnel-hadoop3-3.1.4-uber.jar,/Users/mac/server/seatunnel/connectors/connector-fake-2.3.5.jar,/Users/mac/server/seatunnel/connectors/connector-console-2.3.5.jar" --conf "job.mode=STREAMING" --conf "parallelism=2" --conf "checkpoint.interval=2000" /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-spark-3-starter.jar --config "./config/v2.streaming.conf.template" --master "local[4]" --deploy-mode "client" --name "SeaTunnel"

遇到报错:

2024-07-01 23:25:04,610 INFO v2.V2ScanRelationPushDown:
Pushing operators to SeaTunnelSourceTable
Pushed Filters:
Post-Scan Filters:
Output: name#0, age#1

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/write/Write

看样子是缺少包导致的导致的,可以参见 issue讨论https://github.com/apache/seatunnel/issues/4879 貌似需要 spark 版本 >=3.2 ,而我的是 3.1.1 因此当前这个问题暂时无解。

Since spark 3.2.0, buildForBatch and buildForStreaming have been deprecated in org.apache.spark.sql.connector.write.WriteBuilder. So you should keep spark version >= 3.2.0.

于是,我便下载了 3.2.4(spark -> spark-3.2.4-bin-without-hadoop) 测试后出现了新的问题。

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter
	at java.lang.Class.getDeclaredMethods0(Native Method)
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
	at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
	at java.lang.Class.getMethod0(Class.java:3018)
	at java.lang.Class.getMethod(Class.java:1784)
	at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:684)
	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:666)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.Filter
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:359)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more

这说的是 log4j的 jar包似乎不存在,由于我们使用的 spark 版本没有 hadoop的依赖,因此需要在 spark-env.sh里面配置相关的属性,如下:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home
export HADOOP_HOME=/Users/mac/apps/hadoop
export HADOOP_CONF_DIR=/Users/mac/apps/hadoop/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/Users/mac/apps/hadoop/bin/hadoop classpath)
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077

再次提交测试后,结果如下:

24/07/02 13:40:19 INFO ConfigBuilder: Parsed config file:
{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 2000
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "row.num" : 16,
            "parallelism" : 2,
            "result_table_name" : "fake",
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

24/07/02 13:40:19 INFO SparkContext: Running Spark version 3.2.4
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 1 [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16)]
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 0 [FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)]
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_1) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_0) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : eMaly, 2131476727
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : Osfqi, 257240275
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : BYVKb, 730735331

看结果符合预期,也就是使用 spark 提交 seatunnl引擎的流任务,通过FakeSource模拟两列输出了 16 条数据。看来的确是需要 spark3.2.x版本的才能成功了。


参考

https://www.modb.pro/db/605827

总结

本节主要总结了单机模式下使用 seatunel完成官方示例程序,初步体会使用,其实使用起来还是很简单的,模式同我之前介绍的 DataX如出一辙,可喜的是它有自己的 web页面可以配置,
因此后面我将分享下如何在页面中进行配置同步任务,最后时间允许的情况下,分析起优秀的源码设计思路,千里之行始于足下,要持续学习,持续成长,然后持续分享,再会~。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1888397.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟机交叉编译基于ARM平台的opencv(ffmpeg/x264)

背景: 由于手上有一块rk3568的开发板,需要运行yolov5跑深度学习模型,但是原有的opencv不能对x264格式的视频进行解码,这里就需要将ffmpegx264编译进opencv。 但是开发板算力有限,所以这里采用在windows下,安…

2024年07年01日 Redis数据类型以及使用场景

String Hash List Set Sorted Set String,用的最多,对象序列化成json然后存储 1.对象缓存,单值缓存 2.分布式锁 Hash,不怎么用到 1.可缓存经常需要修改值的对象,可单独对对象某个属性进行修改 HMSET user {userI…

Element中的选择器组件Select (一级选择组件el-select)

简述&#xff1a;在 Element UI 中&#xff0c;ElSelect&#xff08;或简称为 Select&#xff09;是一个非常常用的选择器组件&#xff0c;它提供了丰富的功能来帮助用户从一组预定义的选项中选择一个或多个值。这里来简单记录一下 一. 组件和属性配置 <el-selectv-model&q…

经典FC游戏web模拟器--EmulatorJS

简介 EmulatorJS是一个基于JavaScript和Webassembly技术的虚拟环境的实现&#xff0c;可以在网页中运行各种经典FC游戏系统&#xff0c;支持任天堂、世嘉、雅达利等经典红白机。EmulatorJS的诞生使得诸如超级玛丽、坦克大战、魂斗罗等经典FC游戏能够以一种全新的方式回归。本文…

MySQL:高效的索引

数据库索引 1. 索引介绍1.1 创建索引1.2 查看索引 2. 索引应用2.1 前缀索引2.2 全文索引2.3 复合索引2.4 复合索引中的列顺序2.5 建立最佳索引2.6 使用索引排序2.7 覆盖索引 3. 维护索引4. 建立性能数据库 索引对大型和高并发数据库非常有用&#xff0c;因为它可以显著提升查询…

KVM虚拟机动态添加网卡

一、在宿主机上临时在线添加虚拟网卡&#xff0c;关机再开机失效 1、查看运行的虚拟机 [rootlocalhost img]# virsh list 2、添加NAT网卡&#xff0c;会自动获取192.168.122.X网段的IP virsh attach-interface hadoop01 --type network --source default 3、查看虚机mac …

vue+element-ui简洁完美实现个人博客“​响石潭 ​”

目录 一、项目介绍 二、项目截图 1.项目结构图 2.首页 3.生活 ​编辑 4.文章详情 ​编辑 5.关于我 ​编辑 ​编辑 三、源码实现 1.项目依赖package.json 2.项目启动 3.首页源码 四、总结 一、项目介绍 本项目在线预览&#xff1a;点击访问 参考官网&#xff1…

360的chromesafe64.dll动态链接库导致chrome和edge浏览器闪退崩溃关闭

在chrome或edge浏览器中打开特定的一些网页会导致浏览器闪退关闭 这是windows系统记录的报错日志 chrome报错日志 edge报错日志 日志指向的就是chromesafe64.dll这个动态库 然后用everything搜索发现原来在360安装目录下 360安装目录下的chromesafe64.dll文件 为什么360中的…

使用TensorFlow进行OCR识别:将表格图片转换为结构化数据

随着人工智能和机器学习技术的不断发展&#xff0c;OCR&#xff08;Optical Character Recognition&#xff0c;光学字符识别&#xff09;技术已经成为处理图像中文本信息的强大工具。TensorFlow是一个广泛使用的开源机器学习框架&#xff0c;它提供了丰富的API和工具&#xff…

独立开发者系列(17)——MYSQL的常见异常整理

虽然安装MYSQL到本地很简单&#xff0c;但是数据库报错还是经常出现&#xff0c;这个时候&#xff0c;需要我们进行逐步检查与修复。作为我们最常用的开发软件&#xff0c;无论切换php/go/python/node/java&#xff0c;数据库的身影都少不了&#xff0c;对于我们储存数据而言&a…

Android 如何通过一个设备开发多种分辨率屏幕UI

获取当前屏幕密度&#xff1a; adb shell wm density 获取当前分辨率&#xff1a; adb shell wm size 重置设备密度和分辨率 adb shell wm size reset adb shell wm density reset 屏幕1 adb shell wm size 3082x934 adb shell wm density 160 屏幕2 adb shell wm siz…

【数据结构与算法】利用堆结构高效解决TopK问题

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;《数据结构与算法》 期待您的关注 ​ 目录 一、引言 二、堆的基本概念 三、使用堆解决TopK问题 四、算法实现&#xff08;C语言…

HTTPS基础

目录 1. HTTPS概述2. HTTPS工作原理3. HTTPS证书4. HTTPS安全性特性5. 配置HTTPS示例5.1 获取和配置SSL/TLS证书5.2 示例&#xff1a;在Nginx上配置HTTPS5.3 实施HSTS 6. 结论 1. HTTPS概述 术语描述HTTPS超文本传输安全协议&#xff0c;HTTP的安全版本。SSL/TLS安全套接字层/…

UG NX二次开发(C++)-根据草图创建拉伸特征(UFun+NXOpen)

1、前言 UG NX是基于特征的三维建模软件,其中拉伸特征是一个很重要的特征,有读者问如何根据草图创建拉伸特征,我在这篇博客中讲述一下草图创建拉伸特征的UG NX二次开发方法,感兴趣的可以加入QQ群:749492565,或者在评论区留言。 2、在UG NX中创建草图,然后创建拉伸特征 …

uniapp + vue3 + Script Setup 写法变动 (持续更新)

一、uniapp 应用生命周期&#xff1a; https://uniapp.dcloud.net.cn/tutorial/vue3-composition-api.html 注意&#xff1a; 应用生命周期仅可在App.vue中监听&#xff0c;在其它页面监听无效。 二 、uniapp页面生命周期&#xff1a; https://uniapp.dcloud.net.cn/tutori…

电商控价:系统监测的必要性与优势

在品牌的发展进程中&#xff0c;会遭遇各种各样的渠道问题&#xff0c;控价乃是其中颇为关键的一环。品牌进行控价的目的无疑是为了妥善治理低价链接&#xff0c;低价链接的发现途径可以是人工&#xff0c;也可以是系统。力维网络在为上百个品牌提供服务的过程中察觉到&#xf…

中南大学湘雅三院张如旭/刘爱华团队发现牙髓干细胞来源的外泌体减轻脑缺血再灌注损伤的神经保护机制

随着我国人口老龄化的加剧&#xff0c;中风已成为我国主要的公共卫生疾病之一&#xff0c;确定其潜在的分子机制和治疗靶点对于开发有效的预防和治疗策略至关重要。近期&#xff0c;中南大学湘雅第三医院张如旭、刘爱华团队在经典权威期刊《Pharmacological Research》&#xf…

在 Mac 上使用 MLX 微调微软 phi3 模型

微调大语言模型是常见的需求&#xff0c;由于模型参数量大&#xff0c;即使用 Lora/Qlora 进行微调也需要 GPU 显卡&#xff0c;Mac M系是苹果自己的 GPU&#xff0c;目前主流的框架还在建立在 CUDA 的显卡架构&#xff0c;也就是主要的卡还是来自英伟达。如果要用 Mac 来做训练…

【AI提升】如何使用大模型:本机离线和FastAPI服务调用

大模型本身提供的功能&#xff0c;类似于windows中的一个exe小工具&#xff0c;我们可以本机离线调用然后完成具体的功能&#xff0c;但是别的机器需要访问这个exe是不可行的。常见的做法就是用web容器封装起来&#xff0c;提供一个http接口&#xff0c;然后接口在后端调用这个…

单目行车测距摄像系统(单目测距-行车)

单目行车测距摄像系统是一种利用单个摄像头实现车辆行驶中前方障碍物距离测量的技术。该系统通过计算机视觉算法&#xff0c;能够实时分析摄像头捕捉的图像&#xff0c;精确计算出车辆与前方物体之间的距离&#xff0c;对于自动驾驶、高级驾驶辅助系统&#xff08;ADAS&#xf…