JDBC SQL Server Source Connector: 一览与实践

news2025/1/21 1:04:35

file

在快速发展的数据驱动业务环境中,确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析,经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架,使得从 SQL Server 到其他系统的数据同步变得简单且高效。

本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。

SQL Server

JDBC SQL Server Source Connector

支持 SQL Server 版本

  • 服务器:2008(或更高版本,仅供信息参考)

支持以下引擎

Spark
Flink
Seatunnel Zeta

主要特点

  • 批处理
  • 流处理
  • 精准一次性
  • 列投影
  • 并行处理
  • 支持用户定义拆分

支持查询 SQL 并能够实现投影效果。

描述

通过 JDBC 读取外部数据源数据。

支持的数据源信息

数据源支持的版本驱动URLMaven
SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录
例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

SQL Server 数据类型Seatunnel 数据类型
BITBOOLEAN
TINYINT
SMALLINT
SHORT
INTEGERINT
BIGINTLONG
DECIMAL
NUMERIC
MONEY
SMALLMONEY
DECIMAL((指定列的指定列大小)+1,
(获取指定列的小数点右边的数字的数量。)))
REALFLOAT
FLOATDOUBLE
CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT
STRING
DATELOCAL_DATE
TIMELOCAL_TIME
DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET
LOCAL_DATE_TIME
TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN
尚不支持

源选项

名称类型必需默认值描述
url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://127.0.0.1:1434;database=TestDB
driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
user字符串-连接实例的用户名
password字符串-连接实例的密码
query字符串-查询语句
connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数
partition_column字符串-并行处理的分区列,仅支持数值类型。
partition_lower_bound长整数-用于扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_bound长整数-用于扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_num整数作业并行度分区计数的数量,仅支持正整数。默认值为作业并行度。
fetch_size整数0对返回大量对象的查询,您可以配置查询中使用的行抓取大小,以减少满足选择条件所需的数据库命中次数,从而提高性能。
零表示使用 JDBC 默认值。
common-options-源插件的常见参数,请参阅 源常用选项 以获取详细信息。

提示

如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度进行并行执行。

任务示例

简单:

简单的单一任务以读取数据表

# 定义运行时环境

env {

# 您可以在此处设置 Flink 配置

execution.parallelism = 1
job.mode = "BATCH"
}
source{
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select \* from full_types_jdbc"
}
}

transform { # 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息, # 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)
}

sink {
Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {
  # 您可以在此处设置 Flink 配置
  execution.parallelism = 10
  job.mode = "BATCH"
}

source {
    Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        # 根据需要定义查询逻辑
        query = "select * from full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
    }
}

transform {
    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

分段并行读取示例:

这是一个快速并行读取数据的分片示例

env {
  # 您可以在此处设置引擎配置
  execution.parallelism = 10
}

source {
  # 这是一个示例源插件,仅用于测试和展示源插件的功能
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "select * from column_type_test.dbo.full_types_jdbc"
    # 并行分片读取字段
    partition_column = "id"
    # 片段数量
    partition_num = 10
  }
  # 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}

transform {
  # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
  Console {}
  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1177355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学不会Handler?那是因为你还没有看过这篇文章

对Android开发者来suo,Handler机制无疑是最重要的知识之一,大家肯定也已经看过诸多有关Handler的教学文章了,为什么你会看到这篇文章?显然是你还没学会,或者忘记了,或者想深究一下。好消息是,这…

相对而言,嵌入式开发和纯软件开发哪个更有优势?

相对而言,嵌入式开发和纯软件开发哪个更有优势? 你关心什么方面的优势? 1. 钱途? 纯软件胜,纯软件天花板高很多。嵌入式开发只能说还行。不过天花板这东西是对牛人(至少前30%的人)而言的,混饭吃的话差别就…

JTS: 16 Orientation 方向

这里写目录标题 版本代码 版本 org.locationtech.jts:jts-core:1.19.0 链接: github 代码 public static void main(String[] args) {OrientationUse orientationUse new OrientationUse();orientationUse.test02();}public void test02() {A new Coordinate(2, 1);B new …

高校为什么需要大数据挖掘平台?

目前数据挖掘已经成为各种应用领域的重要技术,大学数据挖掘课程的开放已经出现。数据挖掘课程整合了多门学科知识。该课程包括各种理论知识,也离不开相关的实用技术。整个教学过程是培养和提高学生全面创新和解决问题的能力。过去,教学过程理…

IDEA插件分享,支持接口调试!

平时我们在写完接口需要填入postman、Apipost等工具进行接口调试,今天给大家推荐一款IDEA插件Apipost-helper,写完代码直接可以进行调试,而且支持生成接口文档,JAVA工程师必用! 可以点击下方链接或在插件商店中搜索安…

netty基本用法, 拆包、粘包等常见解决方案,看本文即可,不做原理说明,只进行实战操作

netty的基本用法 完整的介绍了netty最基本的发送byte、string、byteBuf发送接收数据,以及拆包、粘包演示,拆包、粘包的解决方案。看netty的用法只看本篇足以,本篇只说用法,不讲底层原理。 详细demo的git地址 示例的通用代码 客…

KT148A语音芯片的下载板子导入F1A声音下载操作多次只成功一次都没有声音

一、问题简介 为什么我使用KT148A语音芯片的下载板子,导入声音下载,操作好多次,只成功了一次,后面始终都没有声音 芯片分为两个版本,分别是按键版本和一线串口版本。看一下样品卡的校验码: 详细描述 1、如…

数学建模比赛中常用的建模提示词(数模prompt)

以下为数学建模比赛中常用的建模提示词,希望对你有所帮助! 帮我总结一下数学建模有哪些预测类算法? 灰色预测模型级比检验是什么意思? 描述一下BP神经网络算法的建模步骤 对于分类变量与分类变量相关性分析用什么算法 前10年的数据分别是1&a…

京东店铺所有商品数据接口(JD.item_search_shop)

京东店铺所有商品数据接口是一种允许开发者在其应用程序中调用京东店铺所有商品数据的API接口。利用这一接口,开发者可以获取京东店铺的所有商品信息,包括商品标题、SKU信息、价格、优惠价、收藏数、销量、SKU图、标题、详情页图片等。 通过京东店铺所有…

C风格数组和std::array有什么区别

2023年11月6日,周一下午 C风格数组在 C/C 中没有值语义。当将内置数组作为函数参数或返回值时,实际传递的是指向数组首元素的指针,而不是整个数组的副本。这意味着对函数参数中的数组或返回值返回的数组进行修改会影响原始数组,因…

第七章:计算failure概率

文章目录 第七章Random testingSerial BlocksParallel Blocks如何创建 reliability 的 Block digramsmarkov model如何根据系统来构建马尔科夫计算模型software reliability growthbasic execution time model观察 failure操作概要(operational profiles)Time理解 reliablity …

开源不止,创新澎湃 | 2023开源产业生态大会六大专题抢“鲜”看!马上报名,锁定席位!

2023开源产业生态大会 作为上海市经济和信息化委员会、上海市科学技术协会指导的年度重点活动,2023开源产业生态大会即将于12月19日在上海开幕。大会将展示最前沿的开源创新项目,从金融科技、人工智能到智能汽车、智能制造、工业软件等一系列颠覆性应用…

Springboot自动配置那些事

Spring Boot中默认会扫描的启动类对应的子包下面的类,但是项目引入的其他包下面的类要加入到IOC中必须要有所说明,以下说到的自动配置就是干这个活的,springboot就会把配置中的类加载到ioc容器中。 (1)自动配置注册文…

Gradle version对应的 Gradle Plugin version

Gradle version对应的 Gradle Plugin version

如何成为前1%的程序员

目录 大量同质化的知识,会降低这些知识的含金量。 1、拥抱调试 2、质量胜于数量 3、读取代码 4、贡献 5、工具 如果你想成为前1%的程序员,你必须遵循1%的程序员做什么,了解其他99%的人不做什么。在现代,我们有各种学习平台…

伦敦金周末可以交易吗,黄金休市时间是那些?

伦敦金是国际性投资产品,主要交易中心有亚洲、欧洲和美洲,在时差的作用下,三大市场相互连接,形成了全天24小时几乎不间断的交易时间,也为炒金者们提供了充分的操作机会。即便如此,在一些特定的时间段内&…

C语言打印1/1+1/2+1/3.....+1/50结果

while语句&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int i 1;double sum 0.0;while(i < 50){sum 1.0/i;i;}printf("sum %lf\n",sum);return 0; } for语句&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1#includ…

JTS: 15 Angle 角度计算

这里写目录标题 版本代码 版本 org.locationtech.jts:jts-core:1.19.0 链接: github 代码 package pers.stu.algorithm;import org.locationtech.jts.algorithm.Angle; import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.geom.GeometryFactory; imp…

深入浅出继承

目录 一、继承的概念 二、继承的定义 2.1 继承格式 2.2 继承方式与访问限定符 2.3 继承方式和访问限定符 2.4 默认继承方式 三、基类与派生类对象赋值转换 四、继承中的作用域 六、派生类默认成员函数 七、继承与友元 八、继承与静态成员 一、继承的概念 继承&…

【c++】c++类的大小的计算和this指针

文章目录 1.类的大小如何计算&#xff1f;2.类内部的this指针3.this指针的特性 本文为作者关于c类学习过程中的小小总结 1.类的大小如何计算&#xff1f; c的类由成员变量和成员函数等组成&#xff0c;不同于c中的结构体只有成员变量&#xff0c;但类大小的计算方法和结构体的…