基于 Flink CDC 的实时同步系统

news2025/1/9 1:06:46

摘要:本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分:

  1. 功能概述

  2. 架构设计

  3. 技术挑战

  4. 生产实践

Tips:点击「阅读原文」查看原文视频&演讲 ppt

1e87668345f4c13b7bc6b03857e5f608.jpeg

科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等各个行业。科杰科技产品的底层是基于湖仓一体的基础数据平台,在数据平台之上有离线、实时、机器学习等各种系统。我主要负责基于 Flink、Iceberg、K8s 的底层基础设施建设。今天将主要和大家分享,上图中框出来的子系统,即基于 Flink CDC 的实时数据同步系统。

01

功能概述

001aebc042ce8ea1e838fc93776a96a6.jpeg

我们系统的主要的功能有如下几个:

1. 可视化操作。我们做了后台的管理系统,是希望用户在不懂任何代码的情况下,通过点击鼠标就能配置出同步任务做数据同步。

2. 支持整库同步、多表同步。

3. DDL 支持:源端的 Schema 的变更也要同步到目标端。

4. 数据库、表、字段映射。

5. 丰富的数据源支持。目前输入端支持四种常见的关系型数据库,MySQL、Postgre、SQL Server、Oracle,输出端除了这四个数据库之外,还包含 Kafka 和 Iceberg。

6. 丰富的数据类型支持。对输入端的四种关系型数据库,我们常用的所有数据类型都会支持,包括二进制类型。

7. UDF 函数、过滤条件。UDF 函数是指我们在同步过程中,做一些数据转换。过滤条件是指我们会在同步的过程中,加一些过滤条件,只同步想要的数据。

8. 选取字段、添加变量字段。选取字段是指用户可以选择想要的字段进行同步。添加变量是指在同步的过程中,可以手工添加一些字段,比如时间戳或者表名等。

660195e87b9390c85138cd519aee618c.jpeg

市面上有很多实时同步的系统,最终我们选用了 Flink CDC 做实时同步系统的底层技术架构。主要是因为 Flink CDC 有一些独有的优势,包括全量同步、增量同步、全量+增量同步,还有底层基于 Flink 做的分布式计算引擎。

通过 Flink CDC 这套架构,想实现我们现有产品的需求,目前来看还有一些不足。

1. DDL 的支持:PostgreSQL、Oracle 数据库无法获取 Schema 变更的事件,无法捕获相应的 DDL 操作。

2. 整库同步:通过 Flink CDC  的 API 可以捕获表结构的变更信息,但是现有的 Flink Connector 无法将新增的表、字段写入目标端。

3. 需要预知 Schema:Flink 任务需要提前知道表结构的 Schema,然后构建任务,无法实现不重启的情况下动态处理新增表或者字段。

02

架构设计

80ae5eb9824dddf47958993264df1ab1.jpeg

接下来从技术角度给大家分享一下我们系统的设计架构,从上图中可以看到,一共分为三层。

最上面一层是输入端。基于 Flink CDC API 的方式读数据库进行数据抽取,然后把这些数据和 Schema 的信息发到中间的 Kafka,Kafka 是我们的中间缓冲层。最下面一层是输出端,会从 Kafka 读取输入端输入的数据。

在输出端这一层可以看到,首先进行过滤,常用的 SQL 表达式都可以做过滤条件。过滤后对字段应用一些 UDF,比如数据脱敏、加密等等。接下来根据 DB 和 Table 对数据进行 Keyby 分组,然后使用 KeyedProcessFunction 函数对每个表的数据进行一些处理,比如创建表、添加或者修改字段、插入数据等等。

当配置完任务之后,最后我们分别把 Source 和 Sink 的任务提交到运维中心,运维中心会对任务进行启动、停止、查看统计指标、查看任务状态等一系列操作。最后我们的任务支持在 Yarn 和 K8s 上运行,用户可以根据自己的情况进行选择。

8d06781d50cdbc26b4b4e28b89916af1.jpeg

在后台管理系统,用户可以通过配置输入端和输出端,配置需要同步的任务。任务会生成两个配置文件,分别是输入端的配置文件和输出端的配置文件,然后这两个配置文件会分别作为输入端和输出端的启动参数传给两个 Flink 任务。

795f297c514af55043716ea173242c6f.jpeg

这部分主要是想分享下,对于无法获取 DDL 事件的情况我们该如何处理呢?

其实有一些数据库,比如 MySQL,是可以通过 Flink CDC  来获取 Schema 的变更信息的,但是为了代码的逻辑统一,同时适配 Flink CDC  拿不到 Schema 变更的数据库。我们做了代码统一的处理,用一套架构完成数据和 Schema 的抽取和封装。

我们通过 JDBC 的方式,从源数据库把 Schema 的信息查出来,放到 Flink 的 State 里。当下一条数据来的时候,跟 State 里面的 Schema 数据进行对比。相同就不做任何处理,不同就再次查询一下 Schema 的信息,更新到 Flink State 里。同时将从 Flink CDC 拿到的数据和这条数据对应的 Schema 信息,封装成消息体,发送给中间层的 Kafka。从 Schema 读取的信息包含数据的类型、长度、精度,是否是主键等等,格式和 debezium-json 差不多。

Kafka 缓冲层可以用来实现以下几个功能。

在解耦方面:将 Source 和 Sink 解耦; 多个输出端避免重复抽取。比如我想从 MySQL 抽取一些数据,把它同步到 Iceberg 做一些离线的分析。同时又同步到 Kafka,做一些实时的数据处理。这种情况就可以从源端只抽取一次,减少对源端数据库的压力;Sink 出现故障避免 Source 阻塞,类似 flume 的 channel 的功能。

在 DB 对应 Topic 方面:一个数据库里面的数据抽取到一个 Topic;每个 Topic 一个 Partition;单表重放顺序有保证。

fb2115431cfd8f1865bb56a2ed1bbef8.jpeg

输出端和输入端一样,读取后端生成的配置文件作为它的参数,然后使用一些过滤条件,UDF 转换条件等等,从 Kafka 读取数据,进行数据处理。

在数据处理的时候,因为每个输出源的处理逻辑不一样,所以分成以下三类。

1. 写入 RDBMS。通过 JDBC 来操作数据库,包含 DDL、DML。

2. 写入 Iceberg。重写 Flink 写入 Icebrg 逻辑,使用原始 API 写入数据,Commit Snapshot。

3. 写入 Kafka。使用 Flink Kafka Connector 写入 Kafka。

528a2a92e313b22559c52ac9dba8d803.jpeg

运维中心可以对数据进行如下处理:

1. 任务的管理:包含任务的启动、停止、暂停等等。

2. 查看指标:监控一些数据,包含同步任务的数据条数和数据大小。

3. 配置监控报警:同步任务发生故障时,发送报警,包括邮件、短信等等。

4. 查看日志:查看任务启动的日志、任务运行过程中的日志。

03

技术挑战

25d70670e1978c7220f90ec5745129d3.jpeg

下面列举一些主要的技术挑战。

1. 读取增量 Schema:获取源端新增的表、字段以及数据信息(比如 Flink CDC  无法获取 PostgreSQL 数据库的 Schema 变更事件)。

2. 升级 debezium:修改 Flink CDC  源码, 升级 debezium 至最新版,获取 Oracle 新增表、字段事件。

3. SQL 形式过滤条件:支持 SQL 形式的过滤条件,and、or、in、>、<、between 等常用的表达式。

4. 不重启支持动态 Schema:不重启 Flink 任务,支持动态 Schema 及各种 DML 将数据写入目标端。

5. 重构 Flink 写入 Iceberg:没有使用现有的 Flink Datastream API 写入 Iceberg,重新使用 Iceberg 最底层 API 创建、修改表,插入、修改、删除数据。

6. 复杂业务逻辑:支持复杂的业务场景,要保证数据的正确性。

faee251bcd73965cb1cbf6e4ebd52be2.jpeg

这是我们在开发过程中,输出端遇到的第一个问题,也就是 SQL 条件的过滤。大家可能乍一听觉得很简单,加一个 where 条件就行了,但 Flink 任务在做数据同步时,它要求输入端和输出端的 Schema 需要预先提前知道,且它是固定不变的,但是我们的情况有一些不同,比如对于整库同步的过程中,用户新增了一些表,或者在表同步的过程中,新增了一些字段,Flink 现有的 collector 无法识别这些新增的信息,无法在未知的字段上添加 where 条件。那么我们要如何解决这个问题呢?

我们发送到中间 Kafka 缓冲层的数据格式和 debezium-json 的格式差不多,数据主要存储在 payload.after 和 payload.before 里面,这里面的数据的格式是 map 类型,它的 key 是字符串,value 是 object 类型的数据,但是这个格式我们无法把它映射成 Flink SQL,因为 object 类型在 Flink CDC  里面没有对应的类型,所以我们把 object 类型映射成了 string 类型,并对 SQL 进行了一些转换。使用 Flink SQL 解析器把 where 条件进行解析,然后重新生成新的过滤条件。

比如我们原始的过滤 SQL 是这样的:id between 1 and 3.5

经过我们的重构,变成了下面这个形式:cast(payload.after['id'] as DECIMAL(2,1)) BETWEEN ASYMMETRIC 1 AND 3.5

fdaeb1ec2cafa670834d942b3b24876f.jpeg

数据经过 where 条件的过滤之后,并且经过 UDF 函数转换进入 KeyedProcessFunction 函数进行处理。第一步先判断输出端的目标库和目标表是否已经存在。在没有存在的情况下,用纯 JDBC 的方式拼接 SQL 执行 DDL,创建数据库和表。然后进行数据处理,为了提高性能。我们把数据放到队列里,当队列达到一定的阈值后,进行 flush 操作,把数据批量写入数据库。

在这个同步过程中,对于 Schema 的处理和 Source 端一样,把获取的 Schema 信息放到 State 里,每来一条数据进行一次 Schema 对比。如果发生了变更,就能证明数据发生了 DDL 的操作。这个时候要刷数据,把队列里的数据 flush 到数据库,然后执行 DDL,执行完 DDL 之后重新拼接一个 INSERT INTO 的 SQL 执行新插入的数据。通过这种方式实现不重启 Flink 任务的情况下,同时支持 DDL(create、alter)和 DML(insert、update、delete)等一系列操作。

0a86af8c79c580b1b6b25bd86dc9f55c.jpeg

因为 Iceberg 无法用纯 JDBC 的方式写入,所以它无法跟关系型数据结合到一起。因此 Flink 写入 Iceberg 会遇到以下的一些问题。

1. Flink SQL 不支持 DDL。比如 Flink SQL 无法支持 Alter Table 的 DDL 语法。

2. Flink SQL 需预知 Schema。使用 Flink SQL 写入 Iceberg 表,需要提前知道表的 Schema 信息,且无法处理新增字段。

3. DataStream 需预知 Schema。如果使用 API 写入,也会和 Flink SQL 一样遇到同样的问题,写入也是需要提前预知表的 Schema 信息。

4. 提交 Snapshot。Flink 写入 Iceberg 是每次 Checkpoint 提交快照,但是我们需要自己控制,需要在发生 DDL 的时候触发提交。

a8cec682b9a908cf6f62381b2746ea6c.jpeg

我们发现 Flink 不管用 SQL 还是 API 的方式,都无法完成我们的需求,所以我们从更底层的角度来考虑实现方法,最后使用 Iceberg 很底层的 API 来实现我们所需要的功能。

比如 Create Table 就是使用 Iceberg 里的 Catalog 来创建 Table 的,包含一些主键和 Schema。其他的操作,包括修改表的 Schema、写入数据、提交快照等都是用纯 Iceberg 的底层 API 来实现,没有使用现有的 Flink Iceberg API 来做,这样实现起来更加灵活。

811fb2f035ec2e972dbcefcfb05b2a1b.jpeg

在业务上,我们也会面临很多复杂的业务场景,比如对同一字段,我们会有很多种操作。比如需要支持 UDF;对字段加过滤条件;字段的映射;添加常量字段;开启字段同步等等。所以我们在写逻辑的时候,要考虑各种各样复杂的条件。因为可能改了其中某一个功能进而就影响了其他功能。

04

生产实践

dabd5bcc1d6eea54174a7847de22829b.jpeg

我们系统上线后,目前已经服务于十几个客户,涉及到金融、能源等各个行业。支持的数据源包括 MySQL、PostgreSQL、Oracle、SQL Server 等。数据规模方面,目前客户用于同步的任务从几个到几十个库不等,每秒同步数千条数据。

ddb4efe3fe0d320705397ebcf9ac45de.jpeg

未来我们将在以下三方面进行提升:

第一,做一些性能提升。做一些压测,从各个角度提高系统的吞吐率和性能。

第二,希望有更多参数配置。比如 Kafka Sink 的各种 Topic 配置、Iceberg 的分区配置等等。

第三,希望有更多数据源的支持。

往期精选

7eae55dc3fb9018c665a306fd2990d35.png

d2757bbd87fd518edccdd8e9cfff2cd7.jpeg

48ce5b537921c25ebeea2d609560ea7f.jpeg

e47ac7db15c9e0da12e7b127d56fab07.jpeg

25ffcf78b46bef994573de626e952cb9.jpeg

▼ 关注「Apache Flink」,获取更多技术干货 ▼

819573b2f8e5e700aea34685a8248c82.png

 84d76145b990f72092e793f3c70e46cb.gif  点击「阅读原文」,查看更多赛事信息~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/403518.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Regmap API 实验

目录 一、Regmap regmap 结构体 regmap_config 结构体 regmap_config 掩码设置 二、Regmap 操作函数 1、Regmap 申请与初始化 2、 regmap 设备访问 API 函数 3、regmap_update_bits 函数 4、regmap_bulk_read函数 5、regmap_bulk_write 三、修改SPI实验 1、添加regmap…

Kubenates中的日志收集方案ELK(下)

1、rpm安装Logstash wget https://artifacts.elastic.co/downloads/logstash/logstash-6.8.7.rpm yum install -y logstash-6.8.7.rpm2、创建syslog配置 input {beats{port> 5044 } }output {elasticsearch {hosts > ["http://localhost:9200"]index …

【博客632】k8s service ession affinity原理与iptables recent模块

k8s service ession affinity原理与iptables recent模块 1、iptables recent模块 作用&#xff1a; iptables的recent模块用于限制一段时间内的连接数, 是谨防大量请求攻击的必杀绝技! 善加利用该模块可充分保证服务器安全。 recent常用参数&#xff1a; –name 设定列表名称…

Git使用教程:最详细、最傻瓜、最浅显、真正手把手教

GITGIT版本控制版本控制的意义分布式图形化客户端环境搭建仓库的操作分支使用场景命令远程仓库操作生成公钥命令冲突忽略列表的配置时机配置方式版本回退练习&#xff1a;GIT 版本控制 把文件系统中的文件&#xff0c;按照修改的版本进行记录&#xff0c;进行管理的操作。 版…

Spring Cloud ( Consul注册、发现 )

操作步骤&#xff1a; 安装Consul服务端在服务内添加客户端依赖修改配置类&#xff0c;添加注解编写yml文件一、安装Consul服务端 链接&#xff1a;https://www.consul.io/downloads.html&#xff0c;解压 开启cmd&#xff0c;进入你的Consul解压路径&#xff0c;我是在E盘 …

扩展欧几里得算法及其应用

前言 由于数论的板子真的很抽象&#xff0c;也很难背&#xff0c;所以特此记录扩展欧几里得算法的板子和它的用途 本篇文章只涉及应用&#xff0c;不涉及证明&#xff0c;如需理解证明还请各位移步其他优秀的讲解&#xff01; 扩展欧几里得算法 先粘一下板子的代码 typedef lo…

JAVA面向对象特征之——封装

4.封装 private关键字 是一个权限修饰符 可以修饰成员(成员变量和成员方法) 作用是保护成员不被别的类使用&#xff0c;被private修饰的成员只在本类中才能访问 针对private修饰的成员变量&#xff0c;如果需要被其他类使用&#xff0c;提供相应的操作 提供 “get变量名()…

面向数据安全共享的联邦学习研究综述

开放隐私计算 摘 要&#xff1a;跨部门、跨地域、跨系统间的数据共享是充分发挥分布式数据价值的有效途径&#xff0c;但是现阶段日益严峻的数据安全威胁和严格的法律法规对数据共享造成了诸多挑战。联邦学习可以联合多个用户在不传输本地数据的情况下协同训练机器学习模型&am…

【SpringCloud】SpringCloud详解之Feign远程调用

目录前言SpringCloud Feign远程服务调用一.需求二.两个服务的yml配置和访问路径三.使用RestTemplate远程调用(order服务内编写)四.构建Feign(order服务内配置)五.自定义Feign配置(order服务内配置)六.Feign配置日志(oder服务内配置)七.Feign调优(order服务内配置)八.抽离Feign前…

SNS (Simple Notification Service)简介

SNS (Simple Notification Service) 是一种完全托管的发布/订阅消息收发和移动通知服务&#xff0c;用于协调向订阅终端节点和客户端的消息分发。 和SQS (Simple Queue Service)一样&#xff0c;SNS也可以轻松分离和扩展微服务&#xff0c;分布式系统和无服务应用程序&#xf…

九龙证券|直逼1.5万亿!A股融资余额创年内新高,青睐这些行业和个股

2023年以来&#xff0c;A股商场震动重复&#xff0c;商场走势整体先扬后抑&#xff0c;各路资金看法纷歧&#xff0c;但数据显现&#xff0c;融资客在此期间整体持续净买入&#xff0c;未受到商场动摇的明显冲击&#xff0c;融资余额日前已迫临1.5万亿元&#xff0c;创出年内新…

磨金石教育摄影技能干货分享|烟花三月下扬州,是时候安排了!

人间三月最柔情&#xff0c;杨柳依依水波横。三月的风将要吹来&#xff0c;春天的门正式打开。对中国人来说&#xff0c;古往今来&#xff0c;赏春最好的地方是江南。人人都说江南好&#xff0c;可是江南哪里好呢&#xff1f;古人在这方面早就给出了答案&#xff1a;故人西辞黄…

使用高精度秒表StopWatch测试DateTime.Now的精度

StopWatch使用的命名空间&#xff1a;using System.Diagnostics;StopWatch的使用方法&#xff1a;创建Stopwatch对象&#xff1a;stopwatch&#xff1b;stopwatch计时表开启&#xff1a;stopwatch.Start();stopwatch计时表关闭&#xff1a;stopwatch.Stop();计算stopwatch.Stop…

【剧前爆米花--爪哇岛寻宝】进程的调度以及并发和并行,以及PCB中属性的详解。

作者&#xff1a;困了电视剧 专栏&#xff1a;《JavaEE初阶》 文章分布&#xff1a;这是关于进程调度、并发并行以及相关属性详解的文章&#xff0c;我会在之后文章中更新有关线程的相关知识&#xff0c;并将其与进程进行对比&#xff0c;希望对你有所帮助。 目录 什么是进程/…

redis布隆过滤器与四个缓存问题

目录布隆过滤器定义特性使用场景解决缓存穿透的问题黑白名单校验底层原理哈希冲突案例添加key查询key总结四个缓存问题缓存雪崩定义解决方案缓存穿透定义解决方案方案一方案二(guava实现)代码案例源码分析方案三(RedisSon实现)代码实现方案四(直接安装redis插件,应用层解决方案…

港科夜闻|香港科大与中国联通成立联合实验室,推动智慧社会研究发展

关注并星标每周阅读港科夜闻建立新视野 开启新思维1、香港科大与中国联通成立联合实验室&#xff0c;推动智慧社会研究发展。香港科大与中国联通于3月9日签署两份协议以加强战略合作&#xff0c;并成立「香港科技大学 - 中国联通智慧社会联合实验室」&#xff0c;就香港科大建构…

基于支持向量机SVM的风电场NWP数据预测,SVM的详细原理

目录 支持向量机SVM的详细原理 SVM的定义 SVM理论 Libsvm工具箱详解 简介 参数说明 易错及常见问题 SVM应用实例,基于SVM的风电场NWP预测 结果分析 展望 支持向量机SVM的详细原理 SVM的定义 支持向量机(support vector machines, SVM)是一种二分类模型,它的基本模型是定…

江苏专转本转本人后悔排行榜

江苏专转本转本人后悔排行榜 一、复习的太迟&#xff1a; 后悔指数:五颗星。 复习越到最后&#xff0c;时间一天天变少&#xff0c;要复习的内容还有很多&#xff0c;很多人都后悔没有早早开始&#xff0c;总想着多给我两月一定会考上的。 担心时间不够用&#xff0c;那就努力利…

【论文阅读】浏览器扩展危害-Helping or Hindering? How Browser Extensions Undermine Security

本文来源于ACM CCS 2022&#xff1b; https://dl.acm.org/doi/10.1145/3548606.3560685 摘要 “浏览器扩展”是轻量级的浏览器附加组件&#xff0c;使用各个浏览器特定的功能丰富的JavaScript api&#xff0c;为用户提供了额外的Web客户端功能&#xff0c;如改进网站外观和与…

【id:21】【20分】A. DS单链表--类实现

题目描述用C语言和类实现单链表&#xff0c;含头结点属性包括&#xff1a;data数据域、next指针域操作包括&#xff1a;插入、删除、查找注意&#xff1a;单链表不是数组&#xff0c;所以位置从1开始对应首结点&#xff0c;头结点不放数据类定义参考输入n第1行先输入n表示有n个…