本地构建编译Apache-Seatunnel2.3.5适配Web1.0.0运行实现Mysql-CDC示例

news2025/1/10 17:49:09

本地构建编译Apache-Seatunnel2.3.5适配Web1.0.0运行实现Mysql-CDC示例

文章目录

  • 1.前言
  • 2.编译
    • 2.1版本说明
    • 2.2 seatunnel2.3.4-release分支配置
    • 2.3maven调优配置
  • 3.web1.0.0适配
    • 3.1配置文件修改和新增文件
    • 3.2手动拷贝jar修改依赖
    • 3.3修改web不兼容的代码
    • 3.4 web编译打包
  • 4.运行mysql-cdc示例
    • 4.1配置运行seatunnel
    • 4.2配置运行web
    • 4.3 拷贝jar到seatunnel家目录的lib和web家目录的libs下
    • 4.4 ui编译运行注意事项
    • 4.5mysql-cdc的示例
  • 5.总结

1.前言

  本地编译先关可以查看之前的文章

  Apache Seatunnel本地源码构建编译运行调试

https://mp.weixin.qq.com/s/s_cjrXHvBjIQBF8RqQTvcQ
https://blog.csdn.net/qq_34905631/article/details/135068301?spm=1001.2014.3001.5501

  CentOs7.x安装部署SeaTunnelWeb遇到的坑

https://mp.weixin.qq.com/s/1FcCB1TjfEs22iGiCoKL5g
https://blog.csdn.net/qq_34905631/article/details/135074860?spm=1001.2014.3001.5501

  主要看Apache Seatunnel本地源码构建编译运行调试这篇。

2.编译

2.1版本说明

  seatunnel的分支选择2.3.4-release里面的版本是

<version>2.3.5-SNAPSHOT</version>

image-20240430152130138

  先把seatunnel2.3.4-release的分支拉下来,拉下项目来记得切换分支到2.3.4-release分支,这一步特别关键,否则没有切换分支可能默认在dev分支,估计会有问题,这里需要特别注意

web版本为1.0.0。

2.2 seatunnel2.3.4-release分支配置

  如果你的电脑是windwos电脑可以将持久化配置成localfile,seatunnel源码文件有说明:localfile已经废弃,使用hdfs替代,因为我windows电脑上没有安装hadoop环境,所以这种搞一下本地耍没啥问题。

image-20240430152917348

  web的app中也可以加入这个包,下面web配置里面没有截图在这里说明下,加入这个包可选,后面运行的时候还需要把相关的jar包加加到web家目录的libs下和seatunnel家目录下的lib中。

image-20240430152953547

  编译打包:

mvn clean package -pl seatunnel-dist -am '-Dmaven.test.skip=true' -T 8C

2.3maven调优配置

  下面的文章的评论里面有有如何配置maven的调优

https://blog.csdn.net/qq_34905631/article/details/135074860?spm=1001.2014.3001.5501

  setting的maven的Runner中配置jvm参数如下:

-Dfile.encoding=GBK -DarchetypeCatalog=local -Xmx1024m -XX:MetaspaceSize=1024m -XX:MaxMetaspaceSize=1024m -Xss2m -Dmaven.test.skip=true -Dmaven.compile.fork=true

  对maven做了一些参数调优,否则maven导入编译打包会很慢,settings->maven->importing中的jvm参数可以设置为:

-Xmx1024m

  如果项目太大,不设置这个参数项目导入加载会很慢很慢,堆栈有可能溢出了,所以需要设置maven的导入的最大堆设置大一点,Archetype Catalogs里面新增一个local指定本地maven仓库的位置(这种是为了不用每次编译都去远程拉取下载,如果机器配置不好,网络慢,编译就非常非常慢,所以一般都是将各个模块分别install到本地仓库,然后执行总体编译打包命令的时候通过这个配置优选选用本地仓库的jar,就不会去远程下载了,这种就可以加快编译打包的速度),点击Build Tools–>Maven
设置maven的线程数据为:

Thread Count 8 -T option

  设置输入一个8即可。以上配置是为了导入maven项目加载快和编译运行maven项目快。两个项目里面都这种maven调优配置一下

3.web1.0.0适配

3.1配置文件修改和新增文件

image-20240430162758082

3.2手动拷贝jar修改依赖

  修改顶级父pom的版本号如下,该为2.3.5

image-20240430155800659

  lib用于从seatunnel本地编译拷贝一些依赖的jar,然后替换如下:

       <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>seatunnel-common</artifactId>
            <version>${seatunnel-framework.version}</version>
            <scope>system</scope>
            <systemPath>D:/other-workspace/seatunnel-web/lib/seatunnel-common-2.3.5-SNAPSHOT-2.12.15.jar</systemPath>
        </dependency>

  这里由于不可以在顶级的父pom里面修改,因为顶级父pom里面的依赖是在里面统一定义管理的,所以不要加在顶级父pom里面,如果加了会编译报错,里面不支持上面那种手动依赖编译打包本地的jar包的,所以需要修改各个子模块中依赖seatunnel那边的jar包全部拷贝到web项目下的lib下,上面只是一个引入栗子,systemPath这里采用绝对路径的方式统一放在一个lib下管理,systemPath不使用 p o m . b a s e d i r {pom.basedir} pom.basedir{project.basedir}的方式,这种方式不便于多子模块管理外部jar依赖 ,如果用这种方式哪个模块里面需要在对应模块的路径下建立一个lib,然后将对应的jar拷贝过去,不方便,拷过来拷过去的,很烦的,所以使用绝对路径就可以统一管理,需要将web下所有依赖seatunnel的包全部手动拷贝过去这种引入对应的jar即可,编译会报错,就去检查是哪个模块缺少seatunnel的那个依赖包,拷贝到lib然后修改pom依赖即可

image-20240430161805774

  需要手动将拷贝的lib下的jar包文件手动导入项目,否则idea编译后不能识别代码,会有类是红色的说明没有导入依赖,所以要这种手动导入一下才可以的,这里需要特别注意。

3.3修改web不兼容的代码

  上一步修改完依赖编译会有接口代码不兼容报错,所以需要修改web代码

  EngineDataType内部类修改:

public static class SeaTunnelDataTypeConvertor
            implements DataTypeConvertor<SeaTunnelDataType<?>> {

        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {
            return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
        }

        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(
                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
            return seaTunnelDataType;
        }

        @Override
        public SeaTunnelDataType<?> toConnectorType(
                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
            return seaTunnelDataType;
        }

        @Override
        public String getIdentity() {
            return "EngineDataTypeConvertor";
        }
    }

  JobExecutorServiceImpl类

executeJobBySeaTunnel方法:
原来:
JobExecutionEnvironment jobExecutionEnv =
                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
现在:
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
            ClientJobExecutionEnvironment jobExecutionEnv =
                    seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
                    

  JobInstanceServiceImpl类:

public void complete(
            @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId)方法:
原来:
if (statusList.size() == 1 && statusList.contains("FINISHED")) {
            jobStatus = JobStatus.FINISHED.name();
        } else if (statusList.contains("FAILED")) {
            jobStatus = JobStatus.FAILED.name();
        } else if (statusList.contains("CANCELED")) {
            jobStatus = JobStatus.CANCELED.name();
        } else if (statusList.contains("CANCELLING")) {
            jobStatus = JobStatus.CANCELLING.name();
        } else {
            jobStatus = JobStatus.RUNNING.name();
        }
现在:
if (statusList.size() == 1 && statusList.contains("FINISHED")) {
            jobStatus = JobStatus.FINISHED.name();
        } else if (statusList.contains("FAILED")) {
            jobStatus = JobStatus.FAILED.name();
        } else if (statusList.contains("CANCELED")) {
            jobStatus = JobStatus.CANCELED.name();
        } else if (statusList.contains("CANCELING")) {
            jobStatus = JobStatus.CANCELING.name();
        } else {
            jobStatus = JobStatus.RUNNING.name();
        }

  PluginDiscoveryUtil类的getConnectorFeatures方法

原来:
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
            PluginType pluginType) throws IOException {
        Common.setStarter(true);
        if (!pluginType.equals(PluginType.SOURCE)) {
            throw new UnsupportedOperationException("ONLY support plugin type source");
        }
        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
        List<Factory> factories;
        if (path.toFile().exists()) {
            List<URL> files = FileUtils.searchJarFiles(path);
            factories =
                    FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
        } else {
            factories =
                    FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
        }
        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
        factories.forEach(
                plugin -> {
                    if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                        TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                        PluginIdentifier info =
                                PluginIdentifier.of(
                                        "seatunnel",
                                        PluginType.SOURCE.getType(),
                                        plugin.factoryIdentifier());
                        featureMap.put(
                                info,
                                new ConnectorFeature(
                                        SupportColumnProjection.class.isAssignableFrom(
                                                tableSourceFactory.getSourceClass())));
                    }
                });
        return featureMap;
    }
现在:
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
            PluginType pluginType) throws IOException {
        Common.setStarter(true);
        if (!pluginType.equals(PluginType.SOURCE)) {
            throw new UnsupportedOperationException("ONLY support plugin type source");
        }
        List<Factory> factories = null;
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
                new SeaTunnelSinkPluginDiscovery();
        Map<PluginIdentifier, String> allSupportedPlugins =
                seaTunnelSinkPluginDiscovery.getAllSupportedPlugins(pluginType);
        for (Entry<PluginIdentifier, String> entry : allSupportedPlugins.entrySet()) {
            PluginIdentifier pluginIdentifier = entry.getKey();
            List<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
            pluginIdentifiers.add(pluginIdentifier);
            List<URL> files = seaTunnelSinkPluginDiscovery.getPluginJarPaths(pluginIdentifiers);
            if (CollectionUtils.isNotEmpty(files)) {
                factories =
                        FactoryUtil.discoverFactories(
                                new URLClassLoader(files.toArray(new URL[0])));
            } else {
                factories =
                        FactoryUtil.discoverFactories(
                                Thread.currentThread().getContextClassLoader());
            }
        }
        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
        if (CollectionUtils.isNotEmpty(factories)) {
            factories.forEach(
                    plugin -> {
                        if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                            TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                            PluginIdentifier info =
                                    PluginIdentifier.of(
                                            "seatunnel",
                                            PluginType.SOURCE.getType(),
                                            plugin.factoryIdentifier());
                            featureMap.put(
                                    info,
                                    new ConnectorFeature(
                                            SupportColumnProjection.class.isAssignableFrom(
                                                    tableSourceFactory.getSourceClass())));
                        }
                    });
        }
        return featureMap;
    }

  SchemaDerivationServiceImpl类derivationSQL中

原来:
TableFactoryContext context =
                new TableFactoryContext(
                        Collections.singletonList(table),
                        ReadonlyConfig.fromMap(config),
                        Thread.currentThread().getContextClassLoader());
现在:
TableTransformFactoryContext context =
                new TableTransformFactoryContext(
                        Collections.singletonList(table),
                        ReadonlyConfig.fromMap(config),
                        Thread.currentThread().getContextClassLoader());

  SeaTunnelEngineProxy类restoreJob方法中

原来:
seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
现在:
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
            seaTunnelClient
                    .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
                    .execute();

  TableSchemaServiceImpl类TableSchemaServiceImpl()方法

原来:
public TableSchemaServiceImpl() throws IOException {
        Common.setStarter(true);
        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
        if (path.toFile().exists()) {
            List<URL> files = FileUtils.searchJarFiles(path);
            files.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
            factory = new DataTypeConvertorFactory(new URLClassLoader(files.toArray(new URL[0])));
        } else {
            factory = new DataTypeConvertorFactory();
        }
    }
现在:
public TableSchemaServiceImpl() throws IOException {
        Common.setStarter(true);
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
                new SeaTunnelSinkPluginDiscovery();
        Map<PluginIdentifier, String> allSupportedPlugins =
                seaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK);
        for (Map.Entry<PluginIdentifier, String> entry : allSupportedPlugins.entrySet()) {
            PluginIdentifier pluginIdentifier = entry.getKey();
            List<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
            pluginIdentifiers.add(pluginIdentifier);
            List<URL> files = seaTunnelSinkPluginDiscovery.getPluginJarPaths(pluginIdentifiers);
            if (CollectionUtils.isNotEmpty(files)) {
                factory =
                        new DataTypeConvertorFactory(new URLClassLoader(files.toArray(new URL[0])));
            } else {
                factory = new DataTypeConvertorFactory();
            }
        }
    }

  TableSchemaServiceImpl类getSeaTunnelSchema中

原来:
SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
现在:
SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType(), null);

  到此web1.0.0的兼容代码已经修改完成。

3.4 web编译打包

  执行如下编译打包命令

mvn clean package -pl seatunnel-web-dist -am '-Dmaven.test.skip=true' -T 8C

4.运行mysql-cdc示例

  经过以上的步骤,seatunnel2.3.5和web1.0.0的适配已经可以正常在dist下打包成功,

4.1配置运行seatunnel

  环境变量和启动类配置

  环境变量:这个是dis的target的二进制解压路径(运行家目录)

-DSEATUNNEL_HOME=D:\other-workspace\seatunnel\seatunnel-dist\target\apache-seatunnel-2.3.5-SNAPSHOT

image-20240430165036398

4.2配置运行web

  环境变量、jvm参数和启动类配置

  环境变量:这个和seatunnel的是同一个

-DSEATUNNEL_HOME=D:\other-workspace\seatunnel\seatunnel-dist\target\apache-seatunnel-2.3.5-SNAPSHOT

  jvm参数:是web-dis下的target的加压路径(运行家目录)

ST_WEB_BASEDIR_PATH=D:\other-workspace\seatunnel-web\seatunnel-web-dist\target\apache-seatunnel-web-1.0.0-SNAPSHOT

4.3 拷贝jar到seatunnel家目录的lib和web家目录的libs下

  seatunnel的家目录:

在这里插入图片描述

  web的家目录:

image-20240430165608746

  需要在这seatunnel家目录的lib和web家目录的libs下放入如下jar包:

mysql-connector-java-8.0.33.jar
datasource-jdbc-mysql-1.0.0-SNAPSHOT.jar
connector-jdbc-2.3.5-SNAPSHOT-2.12.15.jar
connector-cdc-mysql-2.3.5-SNAPSHOT-2.12.15.jar

  其它的cdc也是一样的都要将所需要的jar放到这两个路径下,否则,缺少依赖运行会报错哦。

  点击debug将两个项目启动起来,能正常启动就是ok的,seatunnel启动有一个hadoop的报错,不影响可以正常启动的,如果有hadoop环境就不会有hadoop的报错的。

4.4 ui编译运行注意事项

  本机上安装的node、npm的版本需要大于等于web项目中规定的版本,否则会编译失败,如果版本过低,需要将web-dist的pom中的如下插件注释:

  版本过低会编译打包会遇到如下错误:

[ERROR] Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.11.3:npm (build) on project seatunnel-web-dist: Failed to
 run task: 'npm run build:prod' failed. org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) ->

  解决:

 <!--<plugin>
                        <groupId>com.github.eirslett</groupId>
                        <artifactId>frontend-maven-plugin</artifactId>
                        <version>1.11.3</version>
                        <configuration>
                            <workingDirectory>${project.basedir}/../seatunnel-ui</workingDirectory>
                        </configuration>
                        <executions>
                            <execution>
                                <id>install node and npm</id>
                                <goals>
                                    <goal>install-node-and-npm</goal>
                                </goals>
                                <configuration>
                                    <nodeVersion>v14.17.3</nodeVersion>
                                    <npmVersion>6.14.13</npmVersion>
                                </configuration>
                            </execution>
                            <execution>
                                <id>install</id>
                                <goals>
                                    <goal>npm</goal>
                                </goals>
                                <phase>generate-resources</phase>
                                <configuration>
                                    <arguments>install &#45;&#45;ignore-scripts</arguments>
                                </configuration>
                            </execution>
                            <execution>
                                <id>build</id>
                                <goals>
                                    <goal>npm</goal>
                                </goals>
                                <configuration>
                                    <arguments>run build:prod</arguments>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>-->

  然后去升级本地安装的node、npm,先卸载低版本,然后安装最新稳定高版本,重重之重:配置node的环境变量,npm的环境变量,这个我之前的文章的评论里面有,或者去百度即可。然后将ui启动进行mysql-cdc的示例。

4.5mysql-cdc的示例

  添加数据源

image-20240430172002676

  创建cdc任务

  下面创建了两种类型的任务:数据集成和整库同步,任务报存类型必须是流式类型,如果选择批类型,运行会报不支持的错误,这个两个类型的任务唯一的区别就是数据集成只能选一张表,整库同步可以选多张表来进行cdc任务,源source的的数据源是一个mysql-cdc的一个数据源,而sink端的数据源是一个jdbc的数据源,所以在sink端只能选jdbc的数据源,不能选cdc的数据源,在进行cdc的时候需要检查msyql8的binlog监听是否开启,如果没有开启,需要开启mysql8.0的binlog监听

image-20240430172020919

  任务执行现象和结论:

  点击执行按钮后去目的地表查看,没有数据,过了一会后去查看目的地表有数据,然后改源表中的一条数据的一个字段的值,在去目的地表查看对应的字段也变更了,说明mysql-cdc单表cdc(先做全量后做增量和实时)的demo是成功了。

  在web项目下会有一个profile里面会保存每次创建的任务的文件,下面是我随便找了一个用作demo文件,这个文件的内容都是web页面配置生成到这个profile下的:

env {
"job.mode"=STREAMING
"job.name"="SeaTunnel_Job"
}
source {
MySQL-CDC {
    format=DEFAULT
    "snapshot.split.size"=8096
    "snapshot.fetch.size"=1024
    "incremental.parallelism"=1
    "connect.timeout.ms"=30000
    "connect.max-retries"=3
    "connection.pool.size"=20
    "chunk-key.even-distribution.factor.lower-bound"=0.05
    "chunk-key.even-distribution.factor.upper-bound"=100
    "sample-sharding.threshold"=1000
    "inverse-sampling.rate"=1000
    "startup.mode"=INITIAL
    "exactly_once"="true"
    "stop.mode"=NEVER
    parallelism=1
    "result_table_name"=Table13434473575488
    "dag-parsing.mode"=MULTIPLEX
    catalog {
        factory=Mysql
    }
    database-names=[
        "xxxxx"
    ]
    table-names=[
        "xxxx.xx_order"
    ]
    password="xxxx"
    username=root
    base-url="jdbc:mysql://xxxx:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
    server-time-zone=UTC
}
}
transform {
}
sink {
Jdbc {
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
    "connection_check_timeout_sec"=30
    "batch_size"=1000
    "is_exactly_once"="true"
    "xa_data_source_class_name"=test-cdc1
    "max_commit_attempts"=3
    "transaction_timeout_sec"=-1
    "auto_commit"="true"
    "support_upsert_by_query_primary_key_exist"="true"
    "multi_table_sink_replica"=1
    "source_table_name"=Table13434473575488
    "generate_sink_sql"=true
    catalog {
        factory=MySQL
        username=root
        password="xxx"
        base-url="jdbc:mysql://xxxx:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
    }
    database="xxx_test"
    url="jdbc:mysql://xxxxx:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
    driver="com.mysql.cj.jdbc.Driver"
    password="xxxxxx"
    user=root
}
}

  运行遇到一个报错:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.Long to field org.apache.seatunnel.api.table.catalog.Column.columnLength of type java.lang.Integer in instance of org.apache.seatunnel.api.table.catalog.PhysicalColumn

  这个报错是由于我之前把seatunnel的代码拉下来默认在dev分支,两边引擎的代码不一致导致,所以文章开头特别强调需要切换分支到2.3.4-release上,然后重新编译将seatunnel的jar拷贝到web项目新建的lib路径下统一管理外部jar下,重新编译web后跑起来运行两边的代码就是一致的了,web依赖seatunnel需要以seatunnel中编译的为主,否则就会有一些奇奇怪怪的问。

  同步任务实例:

  下面是一个历史任务执行可以查看的界面,由于我们配置的存储是localfile的,所以这个刷新这个同步任务实例或者点击里面的任务查看会报错,是因为这里的接口是去hdfs上找历史任务的jar包,本地没有hadoop环境所以会报错,特此说明,不影响我们mysql-cdc的操作的,正式环境有hadoop环境就没有这个问题了,这个是windows上本地编译运行需要hadoop环境,否则会报错,但是在linux环境不会,这个存储介质支持:hdfs、oss(阿里oss-hdfs)、s3等

image-20240430172037387

5.总结

  其实他的逻辑也简单的,就是根据主键唯一标识分割数据发给不同的节点上两阶段执行,每个节点都是执行source --> t —> sink,连接段提交事务才算完成一整个链路的数据同步的,如果是cdc的cdc的那个插件,拿mysql来讲是会监听binlog的日志变化,然后读取变化的数据发给节点sink根据主键更新,source的数据源是一个cdc的数据源,而且sink端的数据源是一个jdbc的数据源的;这个跟写业务代码处理数据同步有啥区别么?
  比如说同步一张有几千万数据的一个单表,
  第一步:分页根据id升序查出所有的id,(或者是查一个list,分割list给多个线程执行)
  第二步:一页一个线程处理数据同步
  第三部:加一个栅栏同步等待所有数据同步线程处理完成,然后主线程才算执行完成
思路是不是都大同小异:都是分割数据,多线程执行(分布式多节点执行任务),栅栏同步等待全部线程执行完成(两阶段事务,read + writ都之心完成才算执行成功,否则回退),根据主键分割数据,然后下发给多节点同步处理数据,在加一个两阶段事务,保证数据read + write 两边一致性,中间在做一个job的checkpoin(检查点) ,savepoint(保存点) 这两个都是涉及到任务执行的情况状态等信息的持久存储,所以可以存储在hdfs/oss(ali-hdfs-oss)/s3等分布式存储,可以多线程任务共享数据,无非可以在把filink / spark /自定义引擎 啥的搞一套,在一个加个插件发现机制

source + t + sink

  这个三个端都有自己的不同数据源的实现,可以加载发现自己的jar包,无非搞了一个web控制台可视化管理任务,就这么简单的,这个设计思想是可以学习可借鉴,本文用之中方式适配了运行了一个mysql-cdc单表的数据同步,如果是其他的需要用这种方式去适配,如果有其它的api兼容性问题需要自己处理解决,我只是提供一种思路和方法,之前只搞了一个mysql-jdbc的单表数据同步,这次摸索搞了一个mysql-jdbc的单表数据同步,只要熟悉项目代码,能把项目本地编译运行起来就可以修改拓展源码,比如:你可以新增一个其他的数据源的支持,对写入数据的某一个字段值在sink的时候拼接一个字符串啥的需求也是可以做的等等等,或者你可以给官方贡献代码了。希望我的分享对你有所启发和帮助,请一键三连,么么么哒!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1636088.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在iPhone上恢复出厂设置后恢复数据

你不想让这种情况发生&#xff0c;但它确实发生了。您必须将iPhone恢复出厂设置。当您的 iPhone 上出现软件问题且无法修复时&#xff0c;可能会发生这种情况。相反&#xff0c;在更新期间&#xff0c;或者您的iPhone遇到问题时&#xff0c;iPhone上的数据不再存在。 不过不用…

ESP32 烧录固件

第一步&#xff1a;下载固件 git clone --recursive https://github.com/espressif/esp-at.git 第二步&#xff1a;执行编译 在该目录执行 python build.py install 如图&#xff1a; 第三步&#xff1a;选择芯片 输入2 第四步&#xff1a;选择固件 输入1 第五步&#…

【云原生】Docker 实践(三):使用 Dockerfile 文件构建镜像

Docker 实践&#xff08;三&#xff09;&#xff1a;使用 Dockerfile 文件构建镜像 1.使用 Dockerfile 文件构建镜像2.Dockerfile 文件详解 1.使用 Dockerfile 文件构建镜像 Dockerfile 是一个文本文件&#xff0c;其中包含了一条条的指令&#xff0c;每一条指令都用于构建镜像…

吴恩达深度学习笔记:深度学习的 实践层面 (Practical aspects of Deep Learning)1.11-1.12

目录 第二门课: 改善深层神经网络&#xff1a;超参数调试、正 则 化 以 及 优 化 (Improving Deep Neural Networks:Hyperparameter tuning, Regularization and Optimization)第一周&#xff1a;深度学习的 实践层面 (Practical aspects of Deep Learning)1.11 神经网络的权重…

第三方风险:当今互联世界中日益严重的威胁

现代商业环境因协作而蓬勃发展。公司依靠庞大的第三方供应商、供应商、承包商和合作伙伴网络来提供产品、服务和专业知识。 虽然这种错综复杂的关系网络促进了创新和效率&#xff0c;但它也使组织面临隐藏的威胁&#xff1a;第三方风险。 第三方风险是指因公司第三方的作为或…

GitLab常用指令!(工作中常用的)

目录 克隆代码创建分支切换分支将代码提交到分支当中Merge合并 克隆代码 复制完地址&#xff0c;打开Git Bash&#xff0c;然后 git clone “复制的地址”创建分支 创建new_test分支 git branch new_test切换分支 切换到new_test分支 git checkout new_test将代码提交到分…

Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析

背景 在上一篇的博客里&#xff0c;大致介绍了flink checkpoint中的触发的大体流程&#xff0c;现在介绍一下触发之后下游的算子是如何做snapshot。 上一篇的文章: Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析-CSDN博客 代码分析 1. 在SubtaskCheckpointCoo…

使用 LlamaIndex 和 Llama 2-Chat 构建知识驱动的对话应用程序

文章目录 使用 LlamaIndex 和 Llama 2-Chat 构建知识驱动的对话应用程序Llama 2-70B-聊天LlamaIndex 解决方案概述先决条件使用 SageMaker JumpStart 部署 GPT-J 嵌入模型使用 SageMaker Python SDK 进行部署在 SageMaker Studio 中使用 SageMaker JumpStart 进行部署使用 Sage…

Hotcoin Research | 市场洞察:2024年4月22日-28日

加密货币市场表现 本周内加密大盘整体呈现出复苏状态&#xff0c;在BTC减半后进入到震荡上行周期。BTC在$62000-66000徘徊&#xff0c;ETH在$3100-3300徘徊&#xff0c;随着港交所将于 4 月 30 日开始交易嘉实基金的比特币和以太坊现货 ETF&#xff0c;周末行情有一波小的拉升…

从MySQL+MyCAT架构升级为分布式数据库,百丽应用OceanBase 4.2的感受分享

本文来自OceanBase的客户&#xff0c;百丽时尚的使用和测试分享 业务背景 百丽时尚集团&#xff0c;作为国内大型时尚鞋服集团&#xff0c;在中国超过300个城市设有直营门店&#xff0c;数量超过9,000家。集团构建了以消费者需求为核心的垂直一体化业务模式&#xff0c;涵盖了…

VTK —— 二、教程五 - 通过鼠标事件与渲染交互(附完整源码)

代码效果 本代码编译运行均在如下链接文章生成的库执行成功&#xff0c;若无VTK库则请先参考如下链接编译vtk源码&#xff1a; VTK —— 一、Windows10下编译VTK源码&#xff0c;并用Vs2017代码测试&#xff08;附编译流程、附编译好的库、vtk测试源码&#xff09; 教程描述 本…

关于下载上传的sheetjs

一、背景 需要讲后端返回来的表格数据通过前端设置导出其中某些字段&#xff0c;而且得是xlsx格式的。 那就考虑使用控件SheetJS。如果是几年前&#xff0c;一般来说&#xff0c;保存excel的文件都是后端去处理&#xff0c;处理完成给前端一个接口&#xff0c;前端调用了打开…

初学React基础

最近准备跟着黑马React学一下React&#xff0c;扩充一下技术面&#xff0c;打算还是以一边学习一边记笔记为主&#xff0c;进行学习&#xff01; 1. React介绍 1.1. React是什么&#xff1f; React是由FaceBook现在称&#xff08;Meta&#xff09;开发的开源 JavaScript 库&a…

如何使用 ArcGIS Pro 查找小区最近的地铁站

学习 GIS 除了可以用在工作上之外&#xff0c;还可以将其运用到生活之中&#xff0c;比如查找距离小区最近的地铁站&#xff0c;这里为大家介绍一下查找的方法&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的POI数据&#xff0c;除了POI数据…

Python流程控制

描述 Python中的流程控制是编程中用来控制代码执行顺序的结构。包括条件判断&#xff08;if语句&#xff09;、循环&#xff08;for循环和while循环&#xff09;、以及用于跳出或跳过循环的break和continue语句。 条件判断&#xff08;if语句&#xff09; if语句允许我们根据…

C++基础语法练习 - 求平均值

题目链接&#xff1a;https://www.starrycoding.com/problem/156 题目描述 在StarryCoding的语法班期末考试里&#xff0c; n n n名同学的成绩分别为 a 1 , a 2 , . . . , a n a_1, a_2, ..., a_n a1​,a2​,...,an​&#xff0c;请求出全班分数的平均值。 输入格式 第一行…

正则化回归

1. L1正则化 L1正则化是回归参数各个元素绝对值之和。 2. L2正则化 L2正则化是回归参数各个元素平方之和。 3.LOSS回归 线性回归加上L1正则化 4.岭回归 线性回归加上L2正则化 不断增大 L2 约束项参数 α&#xff0c;可以发现岭回归参数优化解不断靠近原点&#xff0c…

中职数字化校园网络建设

中职&#xff08;职校&#xff09;数字化校园的校园网建设立足于职业教育、职业学校的特点&#xff0c;充分注意到信息技术的飞速发展、通信成本的迅速降低的特点。在规划和建设校园网时&#xff0c;遵循以下原则&#xff1a; 采用通用和成熟的技术&#xff1a;采用通用的、成熟…

Android创建快捷方式到桌面

效果图 参考 https://blog.51cto.com/u_16175498/8811197https://blog.51cto.com/u_16175498/8811197 权限 <uses-permission android:name"com.android.launcher.permission.INSTALL_SHORTCUT" /> 实现 if (Build.VERSION.SDK_INT > Build.VERSION_C…

macOS 如何关闭文本编辑区的拼写检查?

macOS 如何关闭文本编辑区的拼写检查&#xff1f; 在使用 macOS 自带软件 Freeform 的时候&#xff0c;发现有些单词的拼写检查会一直显示红色下划线&#xff0c;很烦。 找了下&#xff0c;找到了如新关闭这种提示 之后就正常了&#xff1a;