【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

news2025/1/18 3:18:16

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>
  改为
  <seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertor
        implements DataTypeConvertor<SeaTunnelDataType<?>> {

    @Override
    public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {
        return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
    }

    @Override
    public SeaTunnelDataType<?> toSeaTunnelType(
            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
            throws DataTypeConvertException {
        return seaTunnelDataType;
    }

    @Override
    public SeaTunnelDataType<?> toConnectorType(
            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
            throws DataTypeConvertException {
        return seaTunnelDataType;
    }

    @Override
    public String getIdentity() {
        return "EngineDataTypeConvertor";
    }
}
// 改为
public static class SeaTunnelDataTypeConvertor
            implements DataTypeConvertor<SeaTunnelDataType<?>> {

        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {
            return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
        }

        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(
                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
            return seaTunnelDataType;
        }

        @Override
        public SeaTunnelDataType<?> toConnectorType(
                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
            return seaTunnelDataType;
        }

        @Override
        public String getIdentity() {
            return "EngineDataTypeConvertor";
        }
    }

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {
    Common.setStarter(true);
    Set<PluginIdentifier> pluginIdentifiers =
            SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
    ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
    pluginIdentifiersList.addAll(pluginIdentifiers);
    List<URL> pluginJarPaths =
            new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
    //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
    if (!pluginJarPaths.isEmpty()) {
        //            List<URL> files = FileUtils.searchJarFiles(path);
        pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
        factory =
                new DataTypeConvertorFactory(
                        new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
    } else {
        factory = new DataTypeConvertorFactory();
    }
}
// 改为
    public TableSchemaServiceImpl() throws IOException {
        Common.setStarter(true);
        Set<PluginIdentifier> pluginIdentifiers =
                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
        ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
        pluginIdentifiersList.addAll(pluginIdentifiers);
        List<URL> pluginJarPaths =
                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
        //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
        if (!pluginJarPaths.isEmpty()) {
            //            List<URL> files = FileUtils.searchJarFiles(path);
            pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
            factory =
                    new DataTypeConvertorFactory(
                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
        } else {
            factory = new DataTypeConvertorFactory();
        }
    }

SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =
                    convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
        Common.setDeployMode(DeployMode.CLIENT);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        try {
            SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
            SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
            ClientJobExecutionEnvironment jobExecutionEnv =
                    seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
                final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
            JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
            jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
            jobInstanceDao.update(jobInstance);

            CompletableFuture.runAsync(
                    () -> {
                        waitJobFinish(
                                clientJobProxy,
                                userId,
                                jobInstanceId,
                                Long.toString(clientJobProxy.getJobId()),
                                seaTunnelClient);
                    });

        } catch (ExecutionException | InterruptedException e) {
            ExceptionUtils.getMessage(e);
            throw new RuntimeException(e);
        }
        return jobInstanceId;
    }

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {
            jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {
            jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =
        new TableFactoryContext(
                Collections.singletonList(table),
                ReadonlyConfig.fromMap(config),
                Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =
                new TableTransformFactoryContext(
                        Collections.singletonList(table),
                        ReadonlyConfig.fromMap(config),
                        Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(
            @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
        SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        try {
            seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
}
// 改为
public void restoreJob(
        @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
        SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
        try {
            seaTunnelClient
                .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
                .execute();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
        PluginType pluginType) throws IOException {
    Common.setStarter(true);
    if (!pluginType.equals(PluginType.SOURCE)) {
        throw new UnsupportedOperationException("ONLY support plugin type source");
    }
    Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
    List<Factory> factories;
    if (path.toFile().exists()) {
        List<URL> files = FileUtils.searchJarFiles(path);
        factories =
                FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
    } else {
        factories =
                FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
    }
    Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
    factories.forEach(
            plugin -> {
                if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                    TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                    PluginIdentifier info =
                            PluginIdentifier.of(
                                    "seatunnel",
                                    PluginType.SOURCE.getType(),
                                    plugin.factoryIdentifier());
                    featureMap.put(
                            info,
                            new ConnectorFeature(
                                    SupportColumnProjection.class.isAssignableFrom(
                                            tableSourceFactory.getSourceClass())));
                }
            });
    return featureMap;
}
// 改为

    public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
            PluginType pluginType) {
        Common.setStarter(true);
        if (!pluginType.equals(PluginType.SOURCE)) {
            throw new UnsupportedOperationException("ONLY support plugin type source");
        }

        ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
        pluginIdentifiers.addAll(
                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
        List<URL> pluginJarPaths =
                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);

        List<Factory> factories;
        if (!pluginJarPaths.isEmpty()) {
            factories =
                    FactoryUtil.discoverFactories(
                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
        } else {
            factories =
                    FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
        }
        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
        factories.forEach(
                plugin -> {
                    if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                        TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                        PluginIdentifier info =
                                PluginIdentifier.of(
                                        "seatunnel",
                                        PluginType.SOURCE.getType(),
                                        plugin.factoryIdentifier());
                        featureMap.put(
                                info,
                                new ConnectorFeature(
                                        SupportColumnProjection.class.isAssignableFrom(
                                                tableSourceFactory.getSourceClass())));
                    }
                });
        return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化

2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器

3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义

4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1518871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

备战蓝桥杯Day27 - 省赛真题-2023

题目描述 大佬代码 import os import sysdef find(n):k 0for num in range(12345678,98765433):str1 ["2","0","2","3"]for x in str(num) :if x in str1:if str1[0] x:str1.pop(0)if len(str1) ! 0:k1print(k)print(85959030) 详…

Qt 图形视图 /基于Qt示例DiagramScene解读图形视图框架

文章目录 概述从帮助文档看示例程序了解程序背景/功能理清程序概要设计 分析图形视图的协同运作机制如何嵌入到普通Widget程序中&#xff1f;形状Item和文本Item的插入和删除&#xff1f;连接线Item与形状Item的如何关联&#xff1f;如何绘制ShapeItem间的箭头线&#xff1f; 下…

Centos strema 9 环境部署Glusterfs9

本文档只是创建复制卷&#xff0c;分布式卷&#xff0c;分布式复制卷&#xff0c;纠删卷 操作系统 内核 角色 Ip地址 说明 CentOS Stream 9 x86_64 5.14.0-427.el9.x86_64 客户端 client 192.168.80.119 挂载存储业务机器 CentOS Stream 9 x86_64 5.14.0-427.el9.x8…

Git的介绍

导出项目依赖 # 以后项目给别人需要导出项目依赖&#xff0c;放在项目路径下&#xff0c;以后在运行项目前&#xff0c;先安装依赖 一般约定俗成都叫 requirements.txt,但是会有别的&#xff1a;req.txt | dev.txt # 两种方式&#xff1a; 1、虚拟环境所有装的第三方&…

分享6款非常炫酷的前端动画特效(附在线演示)

分享6款非常不错的项目动画特效 其中有three.js特效、canvas特效、CSS动画、SVG特效等等 下方效果图可能不是特别的生动 那么你可以点击在线预览进行查看相应的动画特效 同时也是可以下载该资源的 Three.js 3D游戏场景动画特效 基于Three.js的HTML5 3D动画&#xff0c;这个动…

【学习】感受野

感受野&#xff08;receptive field&#xff09;是指在神经网络中&#xff0c;某一层输出的特征图上的一个像素点对应输入图像的区域大小。在深度神经网络中&#xff0c;随着网络层数的增加&#xff0c;特征图的感受野也会逐渐增大。这是因为每一层的卷积操作都会扩大感受野。 …

python面向对象的三大特性:封装,继承,多态

1、面向对象有哪些特性 三种&#xff1a;封装性、继承性、多态性 2、Python中的封装 在Python代码中&#xff0c;封装有两层含义&#xff1a; ① 把现实世界中的主体中的属性和方法书写到类的里面的操作即为封装 ② 封装可以为属性和方法添加为私有权限&#xff0c;不能直…

设计模式二三事(含基础使用示例)

设计模式是众多软件开发人员经过长时间的试错和应用总结出来的&#xff0c;解决特定问题的一系列方案。现行的部分教材在介绍设计模式时&#xff0c;有些会因为案例脱离实际应用场景而令人费解&#xff0c;有些又会因为场景简单而显得有些小题大做。 本文会结合在美团金融服务…

苹果Find My App用处多多,产品认准伦茨科技ST17H6x芯片

苹果发布AirTag发布以来&#xff0c;大家都更加注重物品的防丢&#xff0c;苹果的 Find My 就可以查找 iPhone、Mac、AirPods、Apple Watch&#xff0c;如今的Find My已经不单单可以查找苹果的设备&#xff0c;随着第三方设备的加入&#xff0c;将丰富Find My Network的版图。产…

【Git】error: bad signature 0xb86f1e1 和 bfatal: index file corrupt

一、问题 之前都好好的&#xff0c;今天执行 git add .的时候突然报错 报错原因翻译成中文&#xff1a;索引文件损坏 二、解决方法 方法1&#xff1a; 删除.git隐藏文件夹中的index文件 然后执行 git reset 重新生成index文件 git reset 方法2&#xff1a; 重新从远程克隆…

用户和组及权限管理

用户至少属于一个组,在创建时如果不指定组,将会创建同名的组 用户只能有一个基本组(主组),但可以隶属于多个附加组 如果一个组作为某用户的基本组,此组将不能被删除 UID: 用户标识 GID: 组的标识 root管理员的uid及gid 都为0 用户的配置文件: 1./etc/passwd test:x:1000:1000…

文献速递:深度学习乳腺癌诊断---基于深度学习的图像分析预测乳腺癌中HE染色组织病理学图像的PD-L1状态

Title 题目 Deep learning-based image analysis predicts PD-L1 status from H&E-stained histopathol ogy images in breast cancer 基于深度学习的图像分析预测乳腺癌中H&E染色组织病理学图像的PD-L1状态 01 文献速递介绍 编程死亡配体-1&#xff08;PD-L1&…

代码随想录算法训练营三刷day24 | 回溯算法 之 理论基础 77. 组合

三刷day24 理论基础77. 组合递归函数的返回值以及参数回溯函数终止条件单层搜索的过程 理论基础 回溯法解决的问题都可以抽象为树形结构。 因为回溯法解决的都是在集合中递归查找子集&#xff0c;集合的大小就构成了树的宽度&#xff0c;递归的深度&#xff0c;都构成的树的深…

网络安全,硬防迪云

要减少被攻击的频率&#xff0c;游戏开发者可以采取以下措施&#xff1a; 1. 强化安全措施&#xff1a;确保游戏服务器和用户数据的安全性&#xff0c;加密网络传输&#xff0c;防止黑客攻击和数据泄露。 2. 更新和修复漏洞&#xff1a;定期检查游戏代码和服务器&#xff0c;…

css3 实现html样式蛇形布局

文章目录 1. 实现效果2. 实现代码 1. 实现效果 2. 实现代码 <template><div class"body"><div class"title">CSS3实现蛇形布局</div><div class"list"><div class"item" v-for"(item, index) …

如何使用第三方接入淘宝商品详情(主图,详情图)

1、找到可用的API接口&#xff1a;首先&#xff0c;需要找到支持查询商品信息的API接口。这些信息通常可以在电商平台的官方文档或开发者门户网站上找到。 2、注册并获取API密钥&#xff1a;在使用API接口之前&#xff0c;需要注册并获取API密钥。API密钥是识别身份的唯一标识符…

区块链技术中的共识机制算法:以权益证明(PoS)为例

引言&#xff1a; 在区块链技术的演进过程中&#xff0c;共识机制算法扮演着至关重要的角色。除了广为人知的工作量证明&#xff08;PoW&#xff09;外&#xff0c;权益证明&#xff08;Proof of Stake&#xff0c;PoS&#xff09;也是近年来备受关注的一种共识算法。 …

C# 读取多条数据记录导出到 Word 标签模板

目录 应用需求 实现步骤 范例运行环境 配置Office DCOM 实现代码 组件库引入 ​编辑 核心代码 小结 应用需求 将数据库数据表中的数据输出并打印&#xff0c;WORD 是一个良好的载体&#xff0c; 在应用项目里&#xff0c;许多情况下我们会使用数据记录结合 WORD 标签模…

Halcon OCR文字识别

1、OCR文字识别 FontFile : Universal_0-9_NoRej dev_update_window (off) read_image (bottle, bottle2) get_image_size (bottle, Width, Height) dev_open_window (0, 0, Width, Height, black, WindowHandle) set_display_font (WindowHandle, 16, mono, true, false) dev…

妇女节专访|勇敢踏入未知领域,她的 Web3 奇妙之旅

Web3 的出现席卷着数字世界的剧烈变革&#xff0c;让每个人与互联网和数字资产互动的方式产生了深刻的变化。Web3 所强调的去中心化特征&#xff0c;使其成为人们对理想未来世界的一个缩影。而作为一个以技术为核心的新兴领域&#xff0c;Web3 也难以避免传统认知中男性占主导地…