王炸组合:Dolphinscheudler 3.1.*搭配SeaT unnel2.3.*高效完成异构数据数据集成

news2025/1/16 3:18:49

file

概述

本篇主要介绍如何通过Dolphinscheduler海豚调度搭配Seatunnel完成异构数据源之间的数据同步功能,这个在大数据流批一体数仓建设的过程中是一个非常好的解决方案, 稳定高效,只要用上了你肯定爱不释手。

环境准备

  • dolphinscheduler集群 >= 3.1.5
  • dolphinscheduler3.1.5版本源码
  • Seatunnel集群 >= 2.3.3

没有安装好以上准备环境的童鞋,请先参考我的另外两篇文章完成基础环境搭建基于Seatunnel最新2.3.5版本分布式集群安装部署指南(小白版)及dolphinscheduler分布式集群部署指南(小白版)再回到章节继续。

配置文件修改

这里说明一下, 通过海豚调度配置的Seatunnel数据同步任务最后都会被分配到DS集群的某个Worker组或者某个worker节点上进行执行,所以你要保证你的DS集群的目标worker节点上也安装了Seatunnel服务。这很重要,因为实际dolphisncheduler中定义的seatunnel任务实例到最后都是需要调用worker节点上的seatunnel服务在本地执行seatunnel的任务启动命令来完成任务提交和运行。

Dolphinscheduler的配置文件修改

因为我们需要使用seatunnel完成数据集成,所以我们需要在dolphinscheduler的系统环境变量中将我们的Seatunnel的安装目录进行配置。

找到你的dolphinscheduler主节点的安装目录下的$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh

设置SEATUNNEL_HOME的访问目录,将SEATUNNEL_HOME设置为你自己的SeaTunnel安装目录。

export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5}

然后保存重启Dolphinscheduler集群即可完成配置修改同步到所有的api-server、master-server及worker-server节点。

Dolphinscheduler部分源码修改

为什么要修改Dolphinscheduler的源码? 因为我这里使用的Seatunnel的版本是2.3.5,使用的引擎不是Seatunnel的默认引擎, 用的是Spark引擎, Spark我用的版本是2.4.5, 所有我最后在命令执行的命令如下:

$SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template

如果我用的是Spark3.X的版本,我执行命令如下:

$SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template

然而在Dolphinscheduler3.1.5版本的Seatunnel任务插件中,存在一些问题没办法兼容, 首先是前端,这里引擎只支持Spark和Flink,没有针对具体的版本进行兼容,没办法自由的选择使用Spark2、Spark3还是FIink13、Flink15。 file

其次就是后端的代码。 file

找到EngineEnum类, 修改一下代码如下: file

public enum EngineEnum {

    // FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
    // SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
    FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"),
    FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"),
    SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"),
    SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh");

    private String command;

    EngineEnum(String command) {
        this.command = command;
    }

    public String getCommand() {
        return command;
    }
}

这样修改完毕之后就OK了, 然后编译打包dolphinscheduler的源码。

更新Dolphinscheduler集群中的SeaTunnel任务插件

项目编译打包完成之后,找到dolphinscheduler-task-seatunnel工程下的target目录下的dolphinscheduler-task-seatunnel-3.1.5.jar包, 上传到你的dolphinscheduler集群的主节点。

file

然后将主节点上DS安装目录下的api-server/libsmaster-server/libsworker-server/libsalert-server/libs目录(其实这里可以只替换woker-server/libs目录)下的dolphinscheduler-task-seatunnel-3.1.5.jar重命名为dolphinscheduler-task-seatunnel-3.1.5.jar.20240606(带上日期方便知道变更时间)。

file

然后将我们编译的dolphinscheduler-task-seatunnel-3.1.5.jar拷贝到这几个目录(api-server/libs、master-server/libs、worker-server/libs、alert-server/libs目录,确认一下是不是所有目录下都有这个dolphinscheduler-task-seatunnel-3.1.5.jar,没有的目录就直接略过)下。

然后使用主节点上的分发脚本,将api-server/libsmaster-server/libsworker-server/libsalert-server/libs的修改同步到其他的DS节点上,分发完成之后,检查一下分发是否成功。

最后就是重启我们的DS集群,通过以上步骤我们就完成了Dolphisncheduler中SeaTunnel插件的升级适配。

测试验证

我们通过dolphinscheduler的工作流定义页面定义一个Seatunnel数据同步的任务, 完成Oracle数据库表采集到MySQL数据库的任务, 下面我们来操作。

关于seatunnel任务配置脚本文件,官网的文档介绍如下:

  • Source: https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2
  • Transform: https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2
  • Sink: https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2/

Source输入源配置定义说明

这里我们的输入原始Oracle, 所以直接从Source中查找Oracle相关的配置如何定义,官网给我们提供了不少任务示例,:

简单任务示例
# Defining the runtime environment
env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        query = "SELECT * FROM TEST_TABLE"
    }
}

transform {}

sink {
    Console {}
}
按分区列并行任务示例

并行读取你配置的分片字段和分片数据,如果你想读取整个表,可以这样做

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "123456"
        # 根据需要定义查询逻辑
        query = "SELECT * FROM TEST_TABLE"
        # 设置并行分片读取字段
        partition_column = "ID"
        # 分区切片数量
        partition_num = 10
        properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
        }
    }
}
sink {
  Console {}
}
按主键或唯一索引并行任务示例

配置table_path会开启自动分割,可以配置split.*来调整分割策略

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "123456"
        table_path = "DA.SCHEMA1.TABLE1"
        query = "select * from SCHEMA1.TABLE1"
        split.size = 10000
    }
}

sink {
  Console {}
}
并行上下限任务示例

指定查询的上下限内的数据效率更高按照你配置的上下限来读取你的数据源效率更高

source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "123456"
        # Define query logic as required
        query = "SELECT * FROM TEST_TABLE"
        partition_column = "ID"
        # Read start boundary
        partition_lower_bound = 1
        # Read end boundary
        partition_upper_bound = 500
        partition_num = 10
    }
}
多表读取任务示例

配置table_list会开启自动分割,可以通过配置split.来调整分割策略*

env {
  job.mode = "BATCH"
  parallelism = 4
}
source {
  Jdbc {
    url = "jdbc:oracle:thin:@datasource01:1523:xe"
    driver = "oracle.jdbc.OracleDriver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "123456"
    "table_list"=[
        {
            "table_path"="XE.TEST.USER_INFO"
        },
        {
            "table_path"="XE.TEST.YOURTABLENAME"
        }
    ]
    #where_condition= "where id > 100"
    split.size = 10000
    #split.even-distribution.factor.upper-bound = 100
    #split.even-distribution.factor.lower-bound = 0.05
    #split.sample-sharding.threshold = 1000
    #split.inverse-sampling.rate = 1000
  }
}

sink {
  Console {}
}

Sink输出源配置定义说明

简单任务示例

本示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送到JDBC Sink。FakeSource一共生成16行数据(row.num=16),每行有两个字段name(string类型)和age(int类型)。最终的目标表为test_table,表中同样会有16行数据。运行此作业之前,你需要在mysql中创建数据库test和表test_table。如果你还没有安装和部署SeaTunnel,你需要按照安装SeaTunnel中的说明安装并部署SeaTunnel。然后按照快速开始使用SeaTunnel引擎中的说明运行此作业。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    parallelism = 1
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {}

sink {
    jdbc {
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "123456"
        query = "insert into test_table(name,age) values(?,?)"
        }
}
生成输出SQL任务示例

本示例无需编写复杂的sql语句,您可以配置输出端数据库名称表名称来自动为您生成添加语句

sink {
    jdbc {
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "123456"
        # 根据数据库表名自动生成sql语句
        generate_sink_sql = true
        database = test
        table = test_table
    }
}
精确任务示例

对于需要精确写入场景,我们保证精确一次。

sink {
    jdbc {
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        max_retries = 0
        user = "root"
        password = "123456"
        query = "insert into test_table(name,age) values(?,?)"
        is_exactly_once = "true"
        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
    }
}
CDC(变更数据捕获)事件

我们也支持CDC变更数据在这种情况下,您需要配置数据库,表和primary_keys。

sink {
    jdbc {
        url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "123456" 
        generate_sink_sql = true
        # You need to configure both database and table
        database = test
        table = sink_table
        primary_keys = ["id","name"]
        field_ide = UPPERCASE
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode="APPEND_DATA"
    }
}

完整测试脚本配置文件

下面给出本示例中完整的配置文件示例

env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP"
        driver = "oracle.jdbc.OracleDriver"
        user = "appuser001"
        password = "appuser001"
        query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS"
    }
}

transform {}

sink {
    jdbc {
        url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "appuser001"
        password = "appuser001" 
        generate_sink_sql = "true"
        database = "hive"
        table = "met_com_icdoperation_ls"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode="APPEND_DATA"
    }
}

file

将上述脚本中的数据库配置信息修改成你的数据连接配置, 然后将脚本覆盖到上图脚本输入中, 保存工作流, 上线之后启动工作流。

file

到对应数据库验证

原来的Oracle数据库表

file

同步之后的MySQL数据库表

file

任务成功了, 数据也成功同步过来了, OK,测试通过!大家接下来可以在这个Demo的基础上进行更多的扩展和挖掘, 实战的多了, 你对于Dolphinscheduler和Seatunnel的架构和原理的理解就会越来越深入了,慢慢你也可以通过扩展源码来升级和拓展这些优秀开源框架的功能了。创作不易,如果我的文章对你有帮助欢迎点赞,收藏,送你一朵小红花~

原文链接:https://blog.csdn.net/qq_41865652/article/details/140971419

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2277312.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【AI日记】25.01.11 Weights Biases | AI 笔记 notion

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 AI kaggle 比赛:Forecasting Sticker Sales笔记:我的 AI 笔记主要记在两个地方 有道云笔记:数学公式和符号比较多的笔记notion:没什么数学公式的…

Oracle EBS GL定期盘存WIP日记账无法过账数据修复

系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状 用户反映来源为“定期盘存”和类别为“WIP”的日记账无法过账,标准日记账的界面上的过账按钮灰色不可用。但是,在超级用户职责下,该日记账又可以过账,细心检查发现该业务实体下有二个公司段值15100和…

欧拉路径算法

欧拉图: 对于应该连通图G,有: 1欧拉路径:一条路径,它能够不重复地遍历完所有的边,这个性质很像不重复地一笔画完所有边,所以有些涉及到欧拉路径的问题叫做一笔画问题。 2欧拉回路&#xff1a…

【进程与线程】程序和进程在内存中的表现

在计算机系统中,程序和进程是两个密切相关但又有本质区别的概念,尤其在内存中的表现上有显著不同: 在这张图中可以直观地看出程序和进程在内存中的结构区别。 基本定义 程序 程序 是一个 静态实体,表示一组写好的指令和数据的…

“多维像素”多模态雷视融合技术构建自动驾驶超级感知能力|上海昱感微电子创始人蒋宏GADS演讲预告

2025年1月14日,第四届全球自动驾驶峰会将在北京中关村国家自主创新示范区展示交易中心-会议中心举行。经过三年的发展,全球自动驾驶峰会已经成长为国内自动驾驶领域最具影响力、规模最大的产业峰会之一。昱感微电子创始人&CEO受到主办方邀请&#xf…

Linux创建server服务器实现多方信息收发

一,服务端 1.创建socket套接字,用于网络通信,同一台机器上的进程也可以通过本地套接字进行通信 //1.socket s_fd socket(AF_INET,SOCK_STREAM,0); if(s_fd -1){ perror("socket"); exit(-1); } //server address s_addr.sin_fami…

UML系列之Rational Rose笔记七:状态图

一、新建状态图 依旧是新建statechart diagram; 二、工作台介绍 接着就是一个状态的开始:开始黑点依旧可以从左边进行拖动放置: 这就是状态的开始,和活动图泳道图是一样的;只能有一个开始,但是可以有多个…

jsx语法中el-table-v2中cellRender如何使用动态绑定

答案::attribute"xx"改为attribute{xx} 改写: const columns ref([{ key: index, dataKey: index, title: t(setting.index), width: 100 },{ key: no, dataKey: no, title: t(setting.key), width: 100 },{ key: name, dataKey: name, tit…

【初识扫盲】厚尾分布

厚尾分布(Fat-tailed distribution)是一种概率分布,其尾部比正态分布更“厚”,即尾部的概率密度更大,极端值出现的概率更高。 一、厚尾分布的特征 尾部概率大 在正态分布中,极端值(如距离均值很…

EFK采集k8s日志

在 Kubernetes 集群中,需要全面了解各个 pod 应用运行状态、故障排查和性能分析。但由于 Pod 是动态创建和销毁的,其日志分散且存储不持久,因此需要通过集中式日志采集方案,将日志收集到统一的平台并配置日志可视化分析和监控告警…

HTML5教程(中)

HTML5 浏览器支持 HTML5 浏览器支持 目前市面上的浏览器有很多版本,你可以让一些较早的浏览器(不支持HTML5)支持 HTML5。 HTML5 浏览器支持 现代的浏览器都支持 HTML5。 此外,所有浏览器,包括旧的和最新的&#xff…

OpenCV实现彩色图像的直方图均衡化

1、直方图均衡化 在OpenCV中,equalizeHist函数用于直方图均衡化(Histogram Equalization)。这是一种图像处理技术,旨在增强图像的对比度,特别是在图像的灰度值集中于某个范围时非常有用。通过调整图像的灰度分布&…

速通nvm安装配置全程无废话

速通nvm安装配置全程无废话 1、安装包 通过网盘分享的文件:nvm-setup-1.1.11.zip等2个文件 链接: https://pan.baidu.com/s/1nk7pAFhhnHXDIIYRJLFqNw 提取码: niw8 --来自百度网盘超级会员v3的分享2、下载安装 nvm安装路径:D:\dev\nvm nodejs路径&am…

JUC Java并发编程 高级 学习大纲 动员

目录 口诀 锁 阿里巴巴开发规范 字节面试题 面试题 1 面试题 2 鼓舞 口诀 高内聚低耦合前提下 封装思想 线程 -- 操作 -- 资源类 判断、干活、通知防止虚假唤醒 ,wait 方法要注意注意标志位 flag 可能是 volatile 的 锁 阿里巴巴开发规范 参考书 并发编程…

Unity 3D游戏开发从入门进阶到高级

本文精心整理了Unity3D游戏开发相关的学习资料,涵盖入门、进阶、性能优化、面试和书籍等多个维度,旨在为Unity开发者提供全方位、高含金量的学习指南.欢迎收藏。 学习社区 Unity3D开发者 这是一个专注于Unity引擎的开发者社区,汇聚了众多Un…

国内源快速在线安装qt5.15以上版本。(10min安装好)(图文教程)

参考文章:Qt6安装教程——国内源-CSDN博客 1、在国内源上下载qt在线安装工具 NJU Mirror 2、 将下载好的在线安装工具,放到C盘根目录, 2.1 打开windows Powershell(WinX),下边那个最好。 输入两条指令&a…

[0405].第05节:搭建Redis主从架构

Redis学习大纲 一、3主3从的集群配置: 1.1.集群规划 1.分片集群需要的节点数量较多,这里我们搭建一个最小的分片集群,包含3个master节点,每个master包含一个slave节点,结构如下: 2.每组是一主一从&#x…

数据结构(Java版)第八期:LinkedList与链表(三)

专栏:数据结构(Java版) 个人主页:手握风云 目录 一、链表中的经典面试题 1.1. 链表分割 1.2. 链表的回文结构 1.3. 相交链表 1.4. 环形链表 一、链表中的经典面试题 1.1. 链表分割 题目中要求不能改变原来的数据顺序,也就是如上图所示。…

flutter R库对图片资源进行自动管理

项目中对资源的使用是开发过程中再常见不过的一环。 一般我们在将资源导入到项目中后,会通过资源名称来访问。 但在很多情况下由于我们疏忽输入错了资源名称,从而导致资源无法访问。 所以,急需解决两个问题: 资源编译期可检查可方…

doc、pdf转markdown

国外的一个网站可以: Convert A File Word, PDF, JPG Online 这个网站免费的,算是非常厚道了,但是大文件上传多了之后会扛不住 国内的一个网站也不错: TextIn-AI智能文档处理-图像处理技术-大模型加速器-在线免费体验 https://…