使用 Apache SeaTunnel 在 MySQL 和 HTTP 之间的数据同步示例

news2024/10/11 6:26:10

随着现代企业数据量的不断增长,跨系统、跨平台的数据同步需求变得愈发重要。

在实际的业务场景中,开发者常常需要将数据从 MySQL 同步到其他系统,或者从不同的数据源同步回 MySQL。Apache SeaTunnel 作为一款高效的分布式数据集成平台,支持批处理和流处理,能够灵活地完成这些任务。

本文将详细介绍如何使用 Apache SeaTunnel 实现以下几种常见的数据同步场景:

  • MySQL 同步到 HTTP 接口
  • MySQL 同步到 MySQL
  • HTTP 接口同步到 MySQL
  • MySQL-CDC 同步到 HTTP 接口

我们将逐一展示这些同步场景的配置方式,并提供清晰的代码示例,帮助开发者快速掌握 SeaTunnel 在不同场景下的应用。

官方文档参考

SeaTunnel JDBC Source Connector

前置准备

在开始之前,请确保已经下载了对应版本的 MySQL JDBC 驱动 mysql-connector-java-xxx.jar,并将其放置在 SeaTunnel 的安装目录下的 lib 文件夹中。

可以从以下链接获取:https://mvnrepository.com/artifact/mysql/mysql-connector-java

对于使用 Spark 或 Flink 的 SeaTunnel 任务,也需要将该 JAR 包复制到相应的目录下:

  • Spark: $SPARK_HOME/jars/
  • Flink: $FLINK_HOME/lib/

接下来,我们将逐一展示四种数据同步的配置和代码示例。

MySQL 同步到 HTTP 接口

在此场景中,我们将 MySQL 数据表中的信息同步到指定的 HTTP 接口。

这里假设我们从 user_info 表中查询数据并通过 HTTP POST 请求将其发送到目标 API。

env {
  execution.parallelism = 2
  job.mode = "BATCH"  # MySQL 作为数据源,只支持批量同步
}

source {
   jdbc {
     url =  "jdbc:mysql://172.27.10.22:6033/test"
     driver = "com.mysql.cj.jdbc.Driver"
     connection_check_timeout_sec = 100
     user = "root"
     password = "root"
     query = "SELECT * FROM user_info ORDER BY create_time LIMIT 1"
     result_table_name = "user_info_out"
  }
}

transform {
    Sql {
      source_table_name = "user_info_out"
      result_table_name = "user_info_sink"
      query = "select info, user_name, age from user_info_out"
    }
}

sink {
  Console {
    source_table_name = "user_info_sink"
  }

  http {
    source_table_name = "user_info_sink"
    url = "https://test.test.com:8080/api/user/test"
    method = "POST"
    headers = {Accept="application/json", Content-Type="application/json;charset=utf-8"}
  }
}

MySQL 同步到 MySQL

在此示例中,我们将从一个 MySQL 数据库中提取数据,并将其同步到另一个 MySQL 数据库。此场景适用于多个数据库实例之间的数据迁移或备份。

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}

source {
    Jdbc {
        url =  "jdbc:mysql://172.27.10.22:6033/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 10
        user = "root"
        password = "root"
        query = "SELECT `name`,`score` FROM `user`"
        result_table_name = "user_info"
    }
}

sink {
  Jdbc {
        source_table_name = "user_info"
        url =  "jdbc:mysql://192.27.10.22:16033/temp_user"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "root"
        query = "INSERT INTO `student`(`name`, `score`) VALUES(?, ?)"
  }
}

HTTP 接口同步到 MySQL

本示例展示了如何将 HTTP 接口中的数据同步到 MySQL 数据库。

这在从第三方 API 获取数据并将其存储到本地数据库的场景中非常实用。

env {
  execution.parallelism = 2
  job.mode = "STREAMING"  # HTTP 作为数据源,支持批量和流式模式
  checkpoint.interval = 10000  # 执行间隔(毫秒)
}

source {
  Http {
    url = "https://test.test.com:8080/api/test"
    method = "GET"
    format = "json"
    headers = {Authorization="Bearer example-token", language="zh"}
    params = {userId="fa438165b2c84d8dbe9175d152718437"}
    content_field = "$.content.*"
    schema = {
      fields {
        userId = string
        age = int
        phone = string
        name = string
      }
    }
    result_table_name = "user_info"
  }
}

transform {
    Sql {
      source_table_name = "user_info"
      result_table_name = "user_info_out"
      query = "SELECT name as userName, userId, age, phone FROM user_info"
    }
}

sink {
  Jdbc {
     url = "jdbc:mysql://172.27.10.22:26033/test"
     driver = "com.mysql.cj.jdbc.Driver"
     connection_check_timeout_sec = 100
     user = "root"
     password = "root"
     source_table_name = "user_info_out"
     query = "INSERT INTO `user_bak`(`userName`, `userId`, `age`, `phone`) VALUES (?, ?, ?, ?)"
  }
}

MySQL-CDC 同步到 HTTP 接口

MySQL-CDC(Change Data Capture)允许实时捕获数据库中的数据变化。

在此示例中,我们将 MySQL 数据库中的变化通过 CDC 机制捕获,并将其同步到 HTTP 接口。

env {
  execution.parallelism = 2
  job.mode = "STREAMING"  # MySQL-CDC 支持批量和流式模式
  checkpoint.interval = 10000  # 执行间隔(毫秒)
}

source {
    MySQL-CDC {
      catalog = {
        factory = MySQL
      }
      base-url = "jdbc:mysql://${mysql_ip_port}/test?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
      username = ${mysql_username}  # 使用变量替换
      password = ${mysql_pass}  # 使用变量替换
      table-names = ["test.user"]
      startup.mode = "initial"
      result_table_name = "user_info_out"
      table-names-config = [
        {
          table = "test.user"
          primaryKeys = ["user_id"]
        }
      ]
    }
}

transform {
    FilterRowKind {
      source_table_name = "user_info_out"
      result_table_name = "user_info_sink"
      include_kinds = ["UPDATE_AFTER", "INSERT"]
    }
}

sink {
  http {
    source_table_name = "user_info_sink"
    url = "https://test.test.com:28080/api/user/test"
    method = "POST"
    headers = {Accept="application/json", Content-Type="application/json;charset=utf-8"}
  }
}

总结

通过 Apache SeaTunnel 的强大数据集成能力,开发者可以轻松实现多种数据源之间的同步操作。无论是数据库与 API 之间的数据传输,还是跨数据库的数据迁移,SeaTunnel 都为开发者提供了灵活、高效的解决方案。

希望通过本文的示例,您能够快速上手并在实际项目中 应用 SeaTunnel 进行复杂的数据同步任务。

SeaTunnel 提供的流处理和批处理模式极大地满足了多种场景下的数据处理需求,使得跨平台、跨数据源的数据集成变得更加简单、高效。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2204422.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python爬虫系列】_025.关于互斥锁(Lock)

课 程 推 荐我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)教程合集 👈👈

Vue使用@别名替换后端ip地址

1. 安装 types/node types/node 包允许您在TypeScript项目中使用Node.js的核心模块和API,并提供了对它们的类型检查和智能提示的支持。 npm install types/node --save-dev 比如安装之后,就可以导入nodejs的 path模块,在下面代码 import path…

【SEO】什么是SEO?

什么是SEO(搜索引擎优化)?为什么SEO对于⼀个⽹站⾄关重要? SEO 全称是搜索引擎优化(Search Engine Optimization) 因为我们目前开发的网址,需要人看到,除了通过宣传营销的方式展现…

C++ | Leetcode C++题解之第468题验证IP地址

题目&#xff1a; 题解&#xff1a; class Solution { public:string validIPAddress(string queryIP) {if (queryIP.find(.) ! string::npos) {// IPv4int last -1;for (int i 0; i < 4; i) {int cur (i 3 ? queryIP.size() : queryIP.find(., last 1));if (cur st…

ctf.bugku-eval

题目来源&#xff1a;eval - Bugku CTF 访问页面&#xff0c; 代码解释 <?phpinclude "flag.php"; //包含"flag.php"文件$a $_REQUEST[hello]; //从请求参数hello中获取值并赋给变量$a。 eval( "var_dump($a);"); //…

SQLAlchemy模型定义:映射数据库表到Python类

SQLAlchemy是一个流行的Python SQL工具包和对象关系映射&#xff08;ORM&#xff09;框架&#xff0c;它提供了一个高层的ORM以及底层的SQL表达式语言。使用SQLAlchemy&#xff0c;开发者可以以面向对象的方式来操作数据库&#xff0c;而不必编写复杂的SQL语句。本文将详细介绍…

【spring ai】java 实现RAG检索增强,超快速入门

rag 需求产生的背景介绍&#xff1a; 在使用大模型时&#xff0c;一个常见的问题是模型会产生幻觉&#xff08;即生成的内容与事实不符&#xff09;&#xff0c;同时由于缺乏企业内部数据的支持&#xff0c;导致其回答往往不够精准和具体&#xff0c;偏向于泛泛而谈。这些问题…

如何利用wsl-Ubuntu里conda用来给Windows的PyCharm开发

前提&#xff1a;咱们在wsl-Ubuntu上&#xff0c;有conda的虚拟环境 咱们直接打开PyCharm,打开Settings 更换Python Interpreter即可 当然一开始可能没有下面的选项&#xff0c;需要我们点击右边的Add Interpreter 这里选择wsl 点击next 将这两步进行修改 可以看出来&#xff0…

计算机视觉之OpenCV vs YOLO

好多开发者希望搞明白OpenCV 和YOLO区别&#xff0c;实际上&#xff0c;二者在计算机视觉领域都有广泛应用&#xff0c;但它们有很大的不同。 一、OpenCV 概述 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库。它…

继承--C++

文章目录 一、继承的概念及定义1、继承的概念 二、继承定义1、定义格式2、继承基类成员访问方式的变化3、继承类模板 三、基类和派生类间的转换1、继承中的作用域2、隐藏规则&#xff1a; 四、派生类的默认成员函数1、4个常见默认成员函数2、实现⼀个不能被继承的类 五、继承与…

(八)Proteus仿真STM32单片机GPIO驱动数码管

1&#xff0c;参考上篇&#xff0c;将LED点阵屏更换成数码管如下图 2&#xff0c;修改驱动函数&#xff0c;数组seg[14]前10个是0-9数字的编码&#xff0c;后四个是空格&#xff0c;点&#xff0c;横线&#xff0c;下划线 char seg_decode(char num)//数字解码 {const char se…

【华为欧拉】国产OpenEuler服务器系统安装以及图形界面

openEuler下载 | openEuler ISO镜像 | openEuler社区官网 下载安装iso 本次选择4G的社区版本 安装&#xff0c;复制到光盘&#xff0c;光盘引导安装。虚拟机安装&#xff0c;准备好iso文件引用&#xff0c;指定好安装源&#xff0c;安装界面和centOS基本一样。选择最小安装就…

JVM系列(二) -类的加载过程介绍

一、背景介绍 我们知道 Java 是先通过编译器将.java类文件转成.class字节码文件&#xff0c;然后再通过虚拟机将.class字节码文件加载到内存中来实现应用程序的运行。 那么虚拟机是什么时候加载class文件&#xff1f;如何加载class文件&#xff1f;class文件进入到虚拟机后发…

彻底理解TypeScript函数语法

目录 参数类型基本声明默认参数剩余参数可选只读匿名函数回调函数 返回值类型函数类型表达式调用签名构造签名 函数的重载this可推导的编译选项this类型内置工具 函数是JavaScript非常重要的组成部分&#xff0c;TypeScript中也是如此&#xff0c;TypeScript 提供了强大的类型系…

网关在不同行业自动化生产线的应用

网关在不同行业自动化生产线的应用&#xff0c;展示了其作为信息与物理世界交汇点的广泛影响力&#xff0c;尤其在推动行业智能化、自动化方面发挥了不可估量的作用。以下是网关技术在污水处理、智慧农业、智慧工厂、电力改造及自动化控制等领域的深入应用剖析。 1. 污水处理 …

盒子模型的简单运用

1.块内元素与行内元素 HTML_code <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</titl…

Scala面试题大全~基础题(15题)

1&#xff1a;Scala是什么? Scala是一种多范式的编程语言&#xff0c;它结合了面向对象编程和函数式编程的特性&#xff0c;它支持面向对象、函数式和命令式编程方法。Scala运行在Java虚拟机&#xff08;JVM&#xff09;上&#xff0c;这意味着它可以与Java代码无缝集成。它还…

【多版本并发控制(MVCC)】

并发事务问题&#xff1a; MySQL隔离级别-未提交读&#xff0c;提交读&#xff0c;可重复读&#xff0c;序列化 隔离级别对于并发事务的解决情况 隔离级别脏读不可重复读幻读未提交读不可不可不可读已提交可不可不可可重复读 &#xff08;默认&#xff09;可可不可串行化&…

现货黄金价格走势图策略分析 先看“势”

在现货黄金投资市场&#xff0c;对金价走势图的趋势进行分析&#xff0c;是投资者做出明智决策的关键步骤。通过有效的趋势分析&#xff0c;投资者可以更好地预测市场的走向&#xff0c;从而制定相应的交易策略。本文将详细介绍如何分析金价的趋势&#xff0c;并探讨这种分析方…

J1学习打卡

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 # 数据预处理和加载 import torch from torch import nn, optim from torch.utils.data import DataLoader from torchvision import datasets, transforms, …