seatunnel数据集成(三)多表同步

news2025/1/16 21:03:29

seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用


seatunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例。

1、单表 to 单表

一个source,一个sink

env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from base_region"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/dw"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
    query = "insert into base_region(id,region_name) values(?,?)"
  }
}
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

2、单表 to 多表

一个source,多个sink

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name="t_user"
        query = "select * from t_user;"
    }
}

transform {
  Sql {
    source_table_name = "t_user"
    result_table_name = "t_user_nan"
    query = "select id,name,birth,gender from t_user where gender ='男';"
  }
  Sql {
    source_table_name = "t_user"
    result_table_name = "t_user_nv"
    query = "select id,name,birth,gender from t_user where gender ='女';"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "user"
    password = "password"
    source_table_name = "t_user_nan"
    query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"
  }
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "user"
    password = "password"
    source_table_name = "t_user_nv"
    query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"
  }
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf

3、多表 to 单表

多个source,一个sink

表结构:

-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (
  `event_time` timestamp COMMENT '业务时间',
  `device_id` VARCHAR(32) COMMENT '设备id',
  `device_type` VARCHAR(32) COMMENT '设备类型',
  `device_name` VARCHAR(128) COMMENT '设备名称',
  `cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;

INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);

-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (
  `event_time` timestamp COMMENT '业务时间',
  `device_id` VARCHAR(32) COMMENT '设备id',
  `device_type` VARCHAR(32) COMMENT '设备类型',
  `device_name` VARCHAR(128) COMMENT '设备名称',
  `cpu_usage` INT COMMENT 'CPU使用率百分比'
);

INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);


-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (
  `id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',
  `event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',
  `device_id` VARCHAR(32) COMMENT '设备id',
  `device_type` VARCHAR(32) COMMENT '设备类型',
  `device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',
  `cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) COMMENT='设备状态';
env {
    job.mode="BATCH"
    job.name="device_performance"
}

source {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="switch_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
    }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="router_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
    }
}

transform {
  Sql {
    source_table_name = "switch_src"
    result_table_name = "switch_dst"
    query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  }
  Sql {
    source_table_name = "router_src"
    result_table_name = "router_dst"
    query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  }
}

sink {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "switch_dst"
        query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
      }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "router_dst"
        query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
       }
}

./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf

4、多表 to 多表

多个source,多个sink

env {
    job.mode="BATCH"
    job.name="device_performance"
}

source {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="switch_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
    }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        result_table_name="router_src"
        query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
    }
}

transform {
  Sql {
    source_table_name = "switch_src"
    result_table_name = "switch_dst"
    query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  }
  Sql {
    source_table_name = "router_src"
    result_table_name = "router_dst"
    query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  }
}

sink {
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "switch_dst"
        query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
      }
    Jdbc {
        url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
        driver="com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "router_dst"
        query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
       }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1434918.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BC100 有序序列合并

描述 输入两个升序排列的序列,将两个序列合并为一个有序序列并输出。 数据范围: 1≤n,m≤1000 1≤n,m≤1000 , 序列中的值满足 0≤val≤30000 输入描述: 输入包含三行, 第一行包含两个正整数n, m,用空…

前端使用pdf.js进行pdf文件预览的第二种方式:Viewer.html

背景 最近需要实现一个PDF文档预览的功能&#xff0c;按理说&#xff0c;如果只是简单的预览&#xff0c;使用<embed>、<object>等就可以实现。 但是&#xff0c;我们的需求要实现搜索&#xff01;而且&#xff0c;文档还都超大&#xff0c;均300页以上。那<e…

【网站项目】039菜匣子优选生鲜电商系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

网络编程-序列化和反序列化/应用层协议/

预备知识 理解为什么要应用层协议&#xff1f; 在学过套接字编程后&#xff0c;我们对协议的理解更深了一步&#xff0c;协议也就是一种约定&#xff0c;也可以通俗理解为一种口头约定&#xff0c;对于通信双方来说是必须要遵守的。TCP和UDP协议它们是传输层控制协议&#xf…

XAI:探索AI决策透明化的前沿与展望

文章目录 &#x1f4d1;前言一、XAI的重要性二、为什么需要可解释人工智能三、XAI的研究与应用四、XAI的挑战与展望 &#x1f4d1;前言 随着人工智能技术的快速发展&#xff0c;它已经深入到了我们生活的方方面面&#xff0c;从智能手机、自动驾驶汽车到医疗诊断和金融投资&…

[UI5 常用控件] 07.SplitApp,SplitContainer

文章目录 前言1. SplitApp1.1 组件结构1.2 Demo1.3 mode属性 2. SplitContainer 前言 本章节记录常用控件SplitApp&#xff0c;SplitContainer。主要功能是在左侧显示Master页面&#xff0c;右侧显示Detail页面。 Master页面和Detail页面可以由多个Page组成&#xff0c;并支持…

2024数学建模美赛F题Reducing Illegal Wildlife Trade原创论文讲解(含完整python代码)

大家好呀&#xff0c;从发布赛题一直到现在&#xff0c;总算完成了数学建模美赛本次F题目非法野生动物贸易完整的成品论文。 本论文可以保证原创&#xff0c;保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文。 F题论文共42页&…

ios设备解锁 --Apeaksoft iOS Unlocker

Apeaksoft iOS Unlocker是一款针对iOS系统的密码解锁工具。其主要功能包括解锁多种锁屏类型&#xff0c;包括数字密码、Touch ID、Face ID和自定义密码。此外&#xff0c;它还可以帮助用户删除iPhone密码以进入锁屏设备&#xff0c;忘记的Apple ID并将iPhone激活为新的&#xf…

2023年12月 Python(四级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python等级考试(1~6级)全部真题・点这里 一、单选题(共25题,共50分) 第1题 下列有关分治算法思想的描述不正确的是?( ) A:将问题分解成的子问题具有相同的模式。 B:将问题分解出的各个子问题相互之间有公共子问题。 C:当问题足够小时, 可以直接求解。 D:可以将…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之QRCode组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之QRCode组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、QRCode组件 用于显示单个二维码的组件。 子组件 无。 接口 QRCode(value: st…

MongoDB的操作和理解

什么是MongoDB? MongoDB&#xff1a;基于分布式文件存储的数据库由C语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。MongoDB是一个介于关系数据库和非关系数据库(nosql)之间的产品&#xff0c;是非关系数据库当中功能最丰富&#xff0c;最像关系数据库的。 Mo…

【LeetCode】每日一题 2024_2_4 Nim 游戏(找规律,博弈论)

文章目录 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01;题目&#xff1a;Nim 游戏题目描述代码与解题思路 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01; 题目&#xff1a;Nim 游戏 题目链接&#xff1a;292. Nim 游戏 题目描述 代码与解题…

ubuntu系统下c++ cmakelist vscode debug(带传参的debug)的详细示例

c和cmake的debug&#xff0c;网上很多都需要配置launch.json&#xff0c;cpp.json啥的&#xff0c;记不住也太复杂了&#xff0c;我这里使用cmake插件带有的设置&#xff0c;各位可以看一看啊✌(不知不觉&#xff0c;竟然了解了vscode中配置文件的生效逻辑&#x1f923;) 克隆…

go test单元测试详解

目录 介绍&测试范围 测试函数 执行机制 常用执行模式 子测试 帮助函数Helper() 测试覆盖率 介绍&测试范围 go test测试是go自带的测试工具&#xff0c;主要包括单元测试和性能测试两大类。 包括了工程目录下所有以_test.go为后缀名的源代码文件&#xff0c;这…

C++ 语法文件

程序运行时产生的数据都属于临时数据&#xff0c;程序结束就会被释放。 通过文件可以可以将数据持久化 c中对文件操作需要包含头文件fstream 文件的类型分为两种 1.文本文件 文件以文本的ASCII码形式存储在计算机中 2.二进制文件 稳重以文本的二进制形式存储在计算机中 用…

基于idea解决springweb项目的Java文件无法执行问题

前言 上一篇文章的话介绍了spring以及创建spring项目&#xff0c;但是因为有宝子私聊我说创建的项目那个JAVA文件显示灰色还有一个红点&#xff0c;问我怎么解决下面我来简答的写一下怎么修改配置让他正常的运行 配置 原因好像是因为基于maven的JAVA项目构架&#xff0c;对应…

Android Studio中打开文件管理器

文章目录 一、前言二、操作步骤 一、前言 在Android Studio中有时候需要查看手机的文件目录或者复制文件&#xff0c;但是有时候文件管理器找不到在哪&#xff0c;这里记录该操作流程 二、操作步骤 第一步: 第二步: 第三步:

CentOS7搭建k8s-v1.28.6集群详情

文章目录 1.灌装集群节点操作系统1.1 设置hosts1.2 设置nameserver1.3 关闭防火墙1.4 关闭Selinux1.5 关闭Swap分区1.6 时间同步1.7 调整内核参数1.8 系统内核升级 2.安装Docker2.1 卸载旧Docker2.2 配置Docker软件源2.3 安装Docker 3.部署Kubernets集群3.1 设置 K8s 软件源3.2…

51单片机 跑马灯

#include <reg52.h>//毫秒级延时函数 void delay(int z) {int x,y;for(x z; x > 0; x--)for(y 114; y > 0 ; y--); }sbit LED1 P1^0x0; sbit LED2 P1^0x1; sbit LED3 P1^0x2; sbit LED4 P1^0x3; sbit LED5 P1^0x4; sbit LED6 P1^0x5; sbit LED7 P1^0x6; s…

属性“xxxx”在类型“ArrayConstructor”上不存在。是否需要更改目标库? 请尝试将 “lib” 编译器选项更改为“es2015”或更高版本。

使用vscode编写vue&#xff0c;在使用elementUI时&#xff0c;发现代码中的form报错如下&#xff1a; 属性“form”在类型“ArrayConstructor”上不存在。是否需要更改目标库? 请尝试将 “lib” 编译器选项更改为“es2015”或更高版本。 解决方法&#xff1a; 打开jsconfig.…