SeaTunnel引擎下的SQL Server CDC解决方案:构建高效数据管道

news2025/1/19 3:17:53

file

在快速发展的数据驱动时代,实时数据处理已经成为企业决策和运营的关键因素。特别是在处理来自各种数据源的信息时,如何确保数据的及时、准确和高效同步变得尤为重要。本文着重介绍了如何利用 SqlServer CDC 源连接器在 SeaTunnel 框架下实现 SQL Server 到其他数据系统的实时数据同步,这对于希望提升数据处理能力和实时数据分析的企业来说,具有重要的实践意义。

SQL Server CDC

SqlServer CDC 源连接器

支持 SQL Server 版本

  • 服务器:2019(或更高版本,仅供参考)

支持引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行处理
  • 支持用户自定义分片

描述

SqlServer CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 SqlServer CDC 连接器以在 SqlServer 数据库上运行 SQL 查询。

支持的数据源信息

数据源支持的版本驱动URLMaven
SqlServer
  • 服务器:2019(或更高版本,仅供参考)
com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433;databaseName=column_type_test下载

安装 Jdbc 驱动

请下载并将 SqlServer 驱动放在 ${SEATUNNEL_HOME}/lib/ 目录下。例如:cp mssql-jdbc-xxx.jar ${SEATUNNEL_HOME}/lib/

数据类型映射

SQL Server 数据类型SeaTunnel 数据类型
CHAR
VARCHAR
NCHAR
NVARCHAR
STRUCT
CLOB
LONGVARCHAR
LONGNVARCHAR
STRING
BLOBBYTES
INTEGERINT
SMALLINT
TINYINT
SMALLINT
BIGINTBIGINT
FLOAT
REAL
FLOAT
DOUBLEDOUBLE
NUMERIC
DECIMAL(column.length(), column.scale().orElse(0))
DECIMAL(column.length(), column.scale().orElse(0))
TIMESTAMPTIMESTAMP
DATEDATE
TIMETIME
BOOLEAN
BIT
BOOLEAN

源选项

名称类型必需默认值描述
username字符串-连接数据库服务器时使用的用户名。
password字符串-连接数据库服务器时使用的密码。
database-names列表-需要监控的数据库名。
table-names列表-表名为模式名和表名的组合(databaseName.schemaName.tableName)。
base-url字符串-必须包含数据库的URL,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。
startup.mode枚举INITIALSqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和 "specific"。
startup.timestamp长整型-从指定的纪元时间戳(以毫秒为单位)开始。
注意,当使用 "startup.mode" 选项为 'timestamp' 时,此选项是必需的。
startup.specific-offset.file字符串-从指定的 binlog 文件名开始。
注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。
startup.specific-offset.pos长整型-从指定的 binlog 文件位置开始。
注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。
stop.mode枚举NEVERSqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。
stop.timestamp长整型-从指定的纪元时间戳(以毫秒为单位)停止。
注意,当 "stop.mode" 选项使用 'timestamp' 时,此选项是必需的。
stop.specific-offset.file字符串-从指定的 binlog 文件名停止。
注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。
stop.specific-offset.pos长整型-从指定的 binlog 文件位置停止。
注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。
incremental.parallelism整型1增量阶段中并行读取器的数量。
snapshot.split.size整型8096表快照的分割大小(行数),快照期间的表会被分割成多个分片进行读取。
snapshot.fetch.size整型1024读取表快照时每次轮询的最大提取量。
server-time-zone字符串UTC数据库服务器中的会话时区。
connect.timeout时长30s连接器尝试连接到数据库服务器后等待超时的最大时间。
connect.max-retries整型3连接器尝试建立数据库服务器连接的最大重试次数。
connection.pool.size整型20连接池大小。
chunk-key.even-distribution.factor.upper-bound双精度浮点型100分块键分布因子的上界。此因子用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于此上界值(即 (MAX(id) - MIN(id) + 1) / 行数),则表分块将被优化为均匀分布。否则,如果分布因子更大,则表将被认为是不均匀分布的,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,将使用基于抽样的分片策略。默认值为 100.0。
chunk-key.even-distribution.factor.lower-bound双精度浮点型0.05分块键分布因子的下界。此因子用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于此下界值(即 (MAX(id) - MIN(id) + 1) / 行数),则表分块将被优化为均匀分布。否则,如果分布因子更小,则表将被认为是不均匀分布的,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,将使用基于抽样的分片策略。默认值为 0.05。
sample-sharding.threshold整型1000触发抽样分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-boundchunk-key.even-distribution.factor.lower-bound 指定的范围,并且估计的分片数(计算为近似行数 / 分块大小)超过此阈值时,将使用抽样分片策略。这可以帮助更有效地处理大型数据集。默认值为1000分片。
inverse-sampling.rate整型1000抽样分片策略中使用的抽样率的倒数。例如,如果这个值设置为1000,意味着抽样过程中应用了1/1000的抽样率。这个选项提供了在控制抽样粒度的灵活性,从而影响最终的分片数量。特别是在处理非常大的数据集时,更低的抽样率是首选。默认值为1000。
exactly_once布尔型true启用精确一次语义。
debezium.*配置-将Debezium的属性传递给用于从SqlServer服务器捕获数据变化的Debezium嵌入式引擎。
查看Debezium的SqlServer连接器属性获取更多信息
format枚举DEFAULTSqlServer CDC 的可选输出格式,有效枚举为 "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON"。
common-options-源插件的通用参数,请参考源通用选项获取详细信息。

任务示例

初始读取简单示例

这是一个流模式CDC初始化读取的示例,成功读取表数据后将进行增量读取。以下SQL DDL仅供参考。

env {
  # 在此处设置引擎配置
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.checkpoint.interval = 5000
}

source {
  # 仅用于测试和演示功能的示例源插件
  SqlServer-CDC {
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="initial"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }

增量读取简单示例

这是一个增量阅读示例,用于阅读变更数据并打印。

env {
  # 在此处设置引擎配置
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.checkpoint.interval = 5000
}

source {
  # 仅用于测试和演示功能的示例源插件
  SqlServer-CDC {
    # 设置精确一次读取
    exactly_once=true 
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="latest"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }
}

随着数据处理需求的不断增长和实时数据同步的重要性日益凸显,SqlServer CDC 源连接器在 SeaTunnel 生态系统中扮演着至关重要的角色。

通过本文的深入解析,我们希望您能够更好地理解并利用这一强大工具,从而实现数据流的高效、稳定和精准同步。

无论您是数据工程师、系统架构师还是业务分析师,掌握如何在 SeaTunnel 中部署和优化 SQL Server CDC 连接器,都将为您的数据处理能力带来显著提升。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1287045.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

arcgis投影栅格不可用

1、使用【投影栅格】工具进行栅格数据投影转换时报错。 解决方法:如果使用的是arcgis10.5及以下的版本,则需要更换更高的版本,因为这个是软件问题,需要更换到arcgis10.6及以上版本,更高级别的版本已经修复了这个问题。…

Selenium自动化测试 —— 模拟鼠标键盘的操作事件

鼠标操作事件 在实际的web产品测试中,对于鼠标的操作,不单单只有click(),有时候还要用到右击、双击、拖动等操作,这些操作包含在ActionChains类中。 ActionChains类中鼠标操作常用方法: 首先导入ActionChains类&#…

基于Springboot的社区医院管理服务系统(有报告)。Javaee项目,springboot项目。

演示视频: 基于Springboot的社区医院管理服务系统(有报告)。Javaee项目,springboot项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系…

关于神舟-战神TA5NS系统重装问题

加装固态卡在log处无法开机问题 下面是我的步骤 1.按f7选择pe安装系统,然后发现卡在战神log处不转动 2.下载驱动 TA5NS驱动地址 下载RAID驱动(如果没有私信我,我网盘里有),拷到u盘中,然后进入pe系统里面…

Python----Pandas

目录 Series属性 DataFrame的属性 Pandas的CSV文件 Pandas数据处理 Pandas的主要数据结构是Series(一维数据)与DataFrame(二维数据) Series属性 Series的属性如下: 属性描述pandas.Series(data,index,dtype,nam…

轨道交通故障预测与健康管理PHM系统的应用

轨道交通是现代城市中不可或缺的交通方式,它为人们提供了快速、高效和可靠的出行方式。然而,由于轨道交通系统的复杂性和高负荷运行,设备故障和运营中断问题时有发生。为了提高轨道交通系统的可靠性和安全性,故障预测与健康管理&a…

Java的TPC通信

TPC通信-快速入门 TPC通信-客户端开发 TPC通信-服务端开发 TPC通信-多发多收 客户端 服务端 TPC通信-支持与多个客户端同时通信 服务端 独立线程对象

看懂lscpu的输出

文章目录 1. lscpu1.1 Architecture1.2 逻辑核心数1.3 缓存1.4 CPU型号1.5 NUMA架构1.5.1 CPU多核架构1.5.2 多CPU Socket架构 2. cat /proc/cpuinfo2.1 关键字段 1. lscpu 通过lscpu查看当前系统的CPU信息。 [hadoopserver3 ~]$ lscpuArchitecture: x86_64 …

使用晶振遇到的两个问题

并联电阻的问题 在一些方案中,晶振并联1MΩ电阻时,程序运行正常,而在没有1MΩ电阻的情况下,程序运行有滞后及无法运行现象发生。 原因分析: 在无源晶振应用方案中,两个外接电容能够微调晶振产生的时钟频率…

Spring容器启动过程中的自定义操作插口汇总

目录标题 PostConstruct注解EventListener方式InitializingBean的afterPropertiesSet方法实现ApplicationRunner接口重写run方法实现AplicationContextAware接口重写setApplicationContext实现ServletContextListener接口contextInitialized方法实现ServletContextAware接口set…

性能测试流程、指标及常见问题!

1.介绍性能测试流程 a.性能需求分析(评审) 基于接口或者场景(全链路)的性能测试指标,一般是tps(每秒事务数,这里都是通过的事务)及art(平均响应时间) b.了解…

基于ssm vue的风景文化管理平台源码和论文

摘 要 随着信息化时代的到来,管理系统都趋向于智能化、系统化,基于vue的木里风景文化管理平台也不例外,但目前国内的市场仍都使用人工管理,市场规模越来越大,同时信息量也越来越庞大,人工管理显然已无法应对…

2-1、地址加法器CS:IP

语雀原文链接 文章目录 1、CPU组成2、通用寄存器16位寄存器的存储16位寄存器兼容8位word 和 byte进位问题 3、地址加法器不同的段地址和偏移地址表示同一个物理地址偏移地址的范围一个段的起始地址一定是16的倍数 4、CS:IPCS IP工作过程jmp修改CS:IP 5、DS和[address]DS和[add…

四.多表查询

多表查询 1.一个案例引发的多表连接1.1案例说明1.2 笛卡尔积(或交叉连接)的理解1.3案例分析与问题解决 2.多表查询分类讲解分类1:等值连接vs非等值连接分类2:自连接vs非自连接分类3:内连接vs外连接 3.SQL99语法实现多表…

记录华为云服务器(Linux 可视化 宝塔面板)-- Nginx配置出现500错误记录

文章目录 1、路由配置,访问显示500如有启发,可点赞收藏哟~ 1、路由配置,访问显示500 错误如图显示500 解决思路如下 1、先查看错误日志 错误日志存放位置 提示 /login配置的文件有问题 开始配置如下图 修改前 修改后(即在/l…

题目:小明的彩灯(蓝桥OJ 1276)

题目描述&#xff1a; 解题思路&#xff1a; 一段连续区间加减&#xff0c;采用差分。最终每个元素结果与0比较大小&#xff0c;比0小即负数输出0。 题解&#xff1a; #include<bits/stdc.h> using namespace std;using ll long long; const int N 1e5 10; ll a[N],…

深度学习之网络优化与正则化

视频链接&#xff1a;7.1 神经网络优化的特点_哔哩哔哩_bilibili 神经网络优化的特点 网络优化的难点 &#xff08;1&#xff09;网络结构差异大&#xff1a;不同模型之间的结构差异大——没有通用的优化算法、超参数多 &#xff08;2&#xff09;非凸优化问题&#xff1a;…

gitee上合并分支

点击pull requests 这个合并代码的位置找了半天没有找到&#xff0c;最后还是搜到才知道

华为云软件冗余依赖智能消除技术Slimming取得重大突破,相应文章已被软工顶会ICSE 2024录用

由于构建工具对软件库的自动化配置管理&#xff0c;使得现代软件项目在版本演化的过程中&#xff0c;引入大量的第三方软件库&#xff0c;依赖树结构日益臃肿。然而&#xff0c;实际上很多引入的软件库并未被真正使用。臃肿的依赖在资源受限的设备上将严重影响代码的性能、增加…

C++智能指针及简单实现

C智能指针 堆内存、栈内存与静态内存静态内存栈内存堆内存 动态内存管理new、delete运算符智能指针实现智能指针 shared_ptr智能指针的线程安全问题解决 unique_ptrweak_ptr循环引用 思维导图本模块思路 动态内存管理 - cppreference.com 堆内存、栈内存与静态内存 静态内存 …