深入浅出Apache SeaTunnel SQL Server Sink Connector

news2025/1/25 4:43:24

在大数据时代,数据的迁移和流动已经变得日益重要。为了使数据能够更加高效地从一个源流向另一个目标,我们需要可靠、高效和易于配置的工具。今天,我们将介绍 JDBC SQL Server Sink Connector,这是一个专为 SQL Server 设计的连接器,能够确保数据的精准、高效传输。

file

不仅如此,它还支持多种流处理引擎,例如 Spark、Flink 和 SeatTunnel Zeta。无论您是初学者还是有经验的开发者,本文都将为您提供关于如何最大限度地利用此连接器的深入见解。

支持 SQL Server 版本

  • 服务器:2008(或更高版本,仅供信息参考)

支持的引擎

Spark
Flink
Seatunnel Zeta

主要特点

  • [x] 精准一次性
  • [x] CDC(变更数据捕获)

使用 Xa 事务 来确保 精准一次性。因此,仅支持支持 Xa 事务 的数据库的 精准一次性。您可以设置 is_exactly_once=true 来启用它。

描述

通过 JDBC 写入数据。支持批处理模式和流处理模式,支持并发写入,支持精准一次性语义(使用 XA 事务保证)。

支持的数据源信息

数据源支持的版本驱动URLMaven
SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录
例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

SQL Server 数据类型Seatunnel 数据类型
BITBOOLEAN
TINYINT
SMALLINT
SHORT
INTEGERINT
BIGINTLONG
DECIMAL
NUMERIC
MONEY
SMALLMONEY
DECIMAL((指定列的指定列大小)+1,
(获取指定列的小数点右边的数字的数量。)))
REALFLOAT
FLOATDOUBLE
CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT
STRING
DATELOCAL_DATE
TIMELOCAL_TIME
DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET
LOCAL_DATE_TIME
TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN
尚不支持

Sink 选项

名称类型必需默认值描述
url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://localhost:1433;databaseName=mydatabase
driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
user字符串-连接实例的用户名
password字符串-连接实例的密码
query字符串-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 具有更高的优先级
database字符串-使用此 databasetable-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,优先级更高。
table字符串-使用数据库和此表名自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,优先级更高。
primary_keys数组-此选项用于支持自动生成 SQL 时的 insertdeleteupdate 等操作。
support_upsert_by_query_primary_key_exist布尔false选择是否使用 INSERT SQL、UPDATE SQL 来处理基于查询主键是否存在的更新事件(INSERT、UPDATE_AFTER)。只有在数据库不支持 upsert 语法时才使用此配置。注意:此方法性能较低。
connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数。
max_retries整数0重试提交失败(executeBatch)的次数。
batch_size整数1000用于批量写入的记录数量达到 batch_size 或时间达到 checkpoint.interval 时,数据将刷新到数据库。
is_exactly_once布尔false是否启用精准一次性语义,将使用 XA 事务。如果开启,需要设置 xa_data_source_class_name
generate_sink_sql布尔false基于要写入的数据库表生成 SQL 语句。
xa_data_source_class_name字符串-数据库驱动程序的 XA 数据源类名,例如,SQL Server 为 com.microsoft.sqlserver.jdbc.SQLServerXADataSource,其他数据源请参考附录。
max_commit_attempts整数3事务提交失败的重试次数。
transaction_timeout_sec整数-1事务打开后的超时时间,默认值为 -1(永不超时)。请注意,设置超时可能会影响精准一次性语义。
auto_commit布尔true默认启用自动事务提交。
common-options-Sink 插件通用参数,请参考 Sink Common Options 以获取详细信息。
## 提示

如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度执行并行操作。

任务示例

简单:

这是一个读取 Sql Server 数据并将其直接插入另一个表中的示例

env {
  # 您可以在此处设置引擎配置
  execution.parallelism = 10
}
source {
  # 这是一个示例源插件,仅用于测试和演示源插件的功能
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "select * from column_type_test.dbo.full_types_jdbc"
    # 并行分片读取字段
    partition_column = "id"
    # 片段数量
    partition_num = 10
  }
}

transform {

  # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"

  }  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

CDC(Change Data Capture)事件

我们还支持 CDC 变更数据,此时需要配置数据库、表和主键。

Jdbc {
  source_table_name = "customers"
  driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
  url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  user = SA
  password = "Y.sa123456"
  generate_sink_sql = true
  database = "column_type_test"
  table = "dbo.full_types_sink"
  batch_size = 100
  primary_keys = ["id"]
}

精确一次性 Sink

事务性写入可能会更慢,但对数据更准确


  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
    is_exactly_once = "true"

    xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource"

  }  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1125101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式linux总线设备驱动模型分析

嵌入式linux系统按照,分层,抽象的思想,按照这样的思想来设计我们的程序可以更容易写出耦合性低、独立性强、可重用性强的代码。 Linux内核中更是存在着更多的分离、分层思想的代码,platform平台设备驱动就是用了这样的思想。本篇…

机器学习(新手入门)-线性回归 #房价预测

题目:给定数据集dataSet,每一行代表一组数据记录,每组数据记录中,第一个值为房屋面积(单位:平方英尺),第二个值为房屋中的房间数,第三个值为房价(单位:千美元…

pv操作题目笔记

对于 pv 操作分以下几步走 什么是pv操作 PV操作在进程同步中通常指的是信号量(Semaphore)操作。信号量是一种用于控制多个并发进程或线程之间的同步和互斥访问的同步工具。PV操作通常涉及两个基本操作:P操作(wait操作&#xff0…

算法通关村第十一关青铜挑战——移位运算详解

大家好,我是怒码少年小码。 计算机到底是怎么处理数字的? 数字在计算机中的表示 机器数 一个数在计算机中的二进制表示形式,叫做这个数的机器数。 机器数是带符号的,在计算机用一个数的最高位存放符号,正数为0&am…

【剑指Offer】:删除链表中的倒数第N个节点(此题是LeetCode上面的)剑指Offer上面是链表中的倒数第K个节点

给定一个链表,删除链表的倒数第 n 个结点,并且返回链表的头结点 示例 1: 输入:head [1,2,3,4,5], n 2 输出:[1,2,3,5] 示例 2: 输入:head [1], n 1 输出:[] 示例 3:…

计算机网路第3章-运输层

概述和运输层服务 运输层协议为运行在不同主机上的应用进程提供了逻辑通信,从应用程序角度看,通过使用逻辑通信,就好像运行在不同主机上的进程直接相连在一起一样。 运输层和网络层的关系 网络层提供主机之间的通信,而运输层提…

面试官的一句话,让五年功能测试老手彻夜难眠!

小王是一名软件测试工程师,已经在目前的公司做了四五年的功能测试。虽然一直表现得非常努力,但他还是没能躲过裁员。只能被动跳槽,寻找更好的职业机会。 然而事情并没有像他想象中那样顺利。在多次面试中小王屡屡碰壁,被面试官吐槽…

leetcode:面试题 17.04. 消失的数字(找单身狗/排序/公式)

一、题目: 函数原型:int missingNumber(int* nums, int numsSize) 二、思路: 思路1 利用“找单身狗”的思路(n^n0;0^nn),数组中有0-n的数字,但缺失了一个数字x。将这些数字按位异或0…

Antv G6入门之旅--combo图

目录 什么是AntV G6 G6 的特性 G6 文档 安装 1 在项目中使用 NPM 包引入 2 在 HTML 中使用 CDN 引入 使用 Step 1 创建容器 Step 2 数据准备 Step 3 创建关系图 Step 4 配置数据源,渲染 React 中使用 G6 Combo图 什么是AntV G6 G6 是一个图可视化引擎…

深度学习模型不确定性方法对比

©PaperWeekly 原创 作者|崔克楠 学校|上海交通大学博士生 研究方向|异构信息网络、推荐系统 本文以 NeurIPS 2019 的 Can You Trust Your Model’s Uncertainty? Evaluating Predictive Uncertainty Under Dataset Shift 论文为主线…

机器学习笔记 - 特斯拉的占用网络简述

一、简述 ​ 2022 年,特斯拉宣布即将在其车辆中发布全新算法。该算法被称为occupancy networks,它应该是对Tesla 的HydraNet 的改进。 自动驾驶汽车行业在技术上分为两类:基于视觉的系统和基于激光雷达的系统。后者使用激光传感器来确定物体的存在和距离,而视觉系统…

acwing第 126 场周赛 (扩展字符串)

5281. 扩展字符串 一、题目要求 某字符串序列 s0,s1,s2,… 的生成规律如下: s0 DKER EPH VOS GOLNJ ER RKH HNG OI RKH UOPMGB CPH VOS FSQVB DLMM VOS QETH SQBsnDKER EPH VOS GOLNJ UKLMH QHNGLNJ Asn−1AB CPH VOS FSQVB DLMM VOS QHNG Asn−1AB,其…

day10_面向对象_抽象_接口

今日内容 1.作业 2.final 3.抽象 4.接口 零、复习 按从大到小的顺序写出访问修饰符 public > protected > package (default)> private static修饰属性和方法的特点在内存的特点: 在方法区(不是在堆,也不是在栈)初始化的特点: 随类(字节码文件)加载到内存已经初始化使…

基于大数据的时间序列股价预测分析与可视化 - lstm 计算机竞赛

文章目录 1 前言2 时间序列的由来2.1 四种模型的名称: 3 数据预览4 理论公式4.1 协方差4.2 相关系数4.3 scikit-learn计算相关性 5 金融数据的时序分析5.1 数据概况5.2 序列变化情况计算 最后 1 前言 🔥 优质竞赛项目系列,今天要分享的是 &…

Redis不止能存储字符串,还有List、Set、Hash、Zset,用对了能给你带来哪些优势?

文章目录 🌟 Redis五大数据类型的应用场景🍊 一、String🍊 二、Hash🍊 三、List🍊 四、Set🍊 五、Zset 📕我是廖志伟,一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO…

1300*B. Road Construction(构造菊花图)

Problem - 330B - Codeforces 解析&#xff1a; 1到任一点距离不超过二&#xff0c;并且有部分点不可以连边&#xff0c;直接统计所有不能连边的点&#xff0c;从之外的点中选一个点当作中心&#xff0c;构造菊花图即可。 #include<bits/stdc.h> using namespace std; i…

CSS常见选择器总结

1.简单选择器 简单选择器是开发中使用最多的选择器&#xff0c;包含&#xff1a; 元素选择器&#xff0c;使用元素的名称 类选择器&#xff0c;使用.类名 id选择器&#xff0c;使用#id id注意事项&#xff1a; 一个HTML文档里面的id值 是唯一的&#xff0c;不能重复 id值如…

阿里云服务器x86计算架构ECS实例规格汇总

阿里云企业级服务器基于X86架构的实例规格&#xff0c;每一个vCPU都对应一个处理器核心的超线程&#xff0c;基于ARM架构的实例规格&#xff0c;每一个vCPU都对应一个处理器的物理核心&#xff0c;具有性能稳定且资源独享的特点。阿里云服务器网aliyunfuwuqi.com分享阿里云企业…

特约|数码转型思考:Web3.0与银行

日前&#xff0c;欧科云链研究院发布重磅报告&#xff0c;引发银行界及金融监管机构广泛关注。通过拆解全球70余家银行的加密布局&#xff0c;报告认为&#xff0c;随着全球采用率的提升与相关技术的成熟&#xff0c;加密资产已成为银行业不容忽视也不能错过的创新领域。 作为…

尚硅谷kafka3.0.0

目录 &#x1f483;概述 ⛹定义 ​编辑⛹消息队列 &#x1f938;‍♂️消息队列应用场景 ​编辑&#x1f938;‍♂️两种模式&#xff1a;点对点、发布订阅 ​编辑⛹基本概念 &#x1f483;Kafka安装 ⛹ zookeeper安装 ⛹集群规划 ​编辑⛹流程 ⛹原神启动 &#x1f938;‍♂️…