新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析

news2024/11/26 2:54:07

本文整理自阿里云开源大数据平台吕宴全关于新一代实时数据集成框架 Flink CDC 3.0 的核心技术架构解析,内容主要分为以下四部分:

  1. Flink CDC 演进历程

  2. Flink CDC 3.0 的架构设计

  3. Flink CDC 3.0 的核心实现

  4. 未来规划

一、Flink CDC 演进历程

Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。

在 2020 年 7 月,Flink CDC 作为一个基于个人兴趣孵化的项目合并了第一个 commit,拉开了 Flink CDC 实时数据集成的篇章,让用户只创建一个简单的 Flink SQL 作业就能完成 CDC 数据的同步、加工和分析。这个阶段里存在通过加锁保证一致性,并且不支持水平拓展的问题,Flink CDC 参考 DBLog 论文的设计,实现了无锁并发读取的全增量同步,完成了从 1.0 到 2.0 的升级。

Flink CDC 2.0 受到了广大用户的好评,不过在广泛应用的过程中,也暴露出了一些有待提升的地方,需要提升的部分主要包括通过 SQL 定义表结构的方式,在上游表发生加减列时需要手动调整作业;在整库同步的场景下需要为每一张表创建一个作业,占用连接多,计算资源消耗大等。在社区用户与开发者的共同努力下,Flink CDC 于 2023 年 12 月完成了 3.0 版本的功能落地,提供了强大的端到端的全增量同步、表结构变更自动同步、整库同步、分库分表同步等高级特性,有效地解决了用户的痛点。

二、Flink CDC 3.0 的架构设计

Flink CDC 3.0 的核心特性包括:

  1. 端到端数据集成,用户只需要配置一个 YAML 文件就能快速构建数据入湖入仓作业

  2. 完整的数据同步,全量读取结束自动同步增量数据,并且上游表结构变更自动应用到下游

  3. 一个作业实例支持读取和写入多表,占用数据库连接少,增量读取阶段自动关闭空闲读取器,节省计算资源

Flink CDC 3.0 的整体架构自顶而下分为 4 层:

  1. Flink CDC API:面向终端用户的 API 层,用户使用 YAML 格式配置数据同步流水线,使用 Flink CDC CLI 提交任务
  2. Flink CDC Connect:对接外部系统的连接器层,通过对 Flink 与现有 Flink CDC source 进行封装实现对外部系统同步数据的读取和写入
  3. Flink CDC Composer:同步任务的构建层,将用户的同步任务翻译为 Flink DataStream 作业
  4. Flink CDC Runtime:运行时层,根据数据同步场景高度定制 Flink 算子,实现 schema 变更、路由、变换等高级功能

三、Flink CDC 3.0 的核心实现

1. 数据抽象

Event 是 Flink CDC 3.0 内部进行数据处理及传输的数据结构接口,其作用类似于 Flink SQL 中的 RowData 接口。Event 目前所有的实现如下图所示。

1.1 ChangeEvent

ChangeEvent 接口代表着在一张表上发生过的变更事件,实现类包括数据变更事件(即 DataChangeEvent 类)和表结构变更事件(即继承 SchemaChangeEvent 接口的类)两种:DataChangeEvent 里保存了完整的数据变更信息,即包含了变更前(before)和变更后(after)每条记录的字段值;SchemaChangeEvent 有增加列、删除列、修改列类型等实现。

Flink CDC 把表结构变更信息当成一种事件流转,这样能够避免在数据变更事件里保存类型信息,需要从 DataChangeEvent 读取数据的节点会基于 SchemaChangeEvent 维护表结构信息。Flink CDC 还实现了自己的序列化器,每条记录使用二进制的方式存储在 Flink 的 MemorySegment 中,通过这种底层结构的优化设计,有效提高在不同节点之间数据流转的效率。

1.2 FlushEvent

FlushEvent 是包含数据刷写控制逻辑的特殊事件。当发生表结构变更事件后,之前的数据可能尚未处理完,链路上会并存两种不同表结构的数据。大部分数据库不允许直接在同一批次中混合处理两种表格式的数据,在处理新版本的数据之前,必须确保旧版本的数据已全部完成刷写操作。FlushEvent 作用是间隔这两种数据,在 Sink 端接受到 FlushEvent 后,就需要将之前缓存的数据全部刷写出去。

2. 算子编排

FlinkCDC 根据数据集成的场景,深度定制了 Flink DataStream 的算子链路,目前制定的数据处理链路如下图所示:

下面对这些模块的具体实现做进一步的介绍。

2.1 Source

Source 模块负责生产在链路中流转的变更事件。FlinkCDC 2.0 提供了强大的全增量同步、并发读取的能力,已经能够生成包含各类变更事件信息的 SourceRecord 对象,在此基础上,只需要再实现一个将 SourceRecord 解析成前面介绍的各种表变更事件的 DebeziumDeserializationSchema 自定义转换器,就能完成 FlinkCDC 3.0 数据源的接入。

在第一次启动时,Source 模块需要先拉取表结构信息,并生成 CreateTableEvent 发送到下游中,这是为了让下游节点能够解析 DataChangeEvent。

在 FlinkCDC 里,添加了丰富的 DDL 解析器来辅助数据库变更事件生成。具体来说,通过在 Alter 语句的解析树中每个规则(诸如语句、表达式和字面量等)的进入(Enter)和退出(Exit)阶段添加自定义逻辑,能够生成我们需要的各种 SchemaChangeEvent。以删除列的生成逻辑为例,在 CustomAlterTableParserListener 类的 enterAlterByDropColumn 方法中获取到被删除的列的列名,可以据此生成一个 DropColumnEvent 事件。

public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
    String removedColName = parser.parseName(ctx.uid());
    changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName)));
    super.enterAlterByDropColumn(ctx);
}
2.2 Transform

在当前版本暂未实现。

2.3 Schema

在发生表结构变更事件以后,Schema 模块负责阻塞上游数据的继续发放,直到旧版本格式数据刷写完毕。这个逻辑需要通过 FlushEvent 来传递,由于下游可能存在多个 Sink,需要通过运行在 JobManager 上的一个 OperatorCoordinator 来进行管控,这个 OperatorCoordinator 称为 SchemaRegistry。

具体来说,处理表结构变更的流程如下图所示:

  1. 在程序启动时,所有的 Sink Operator 都向 SchemaRegistry 注册,SchemaRegistry 记录 Writer 的数量。
  2. 在收到来自 Source 的 SchemaChangeEvent 时,SchemaOperator 发送一个包含本次表结构变更事件的 SchemaChangeRequest 给 SchemaRegistry,让 SchemaRegistry 缓存这个 SchemaChangeEvent。
  3. SchemaOperator 下发一个 FlushEvent 给所有的 Sink Operator,Sink Operator 接收到 FlushEvent 后刷写数据到外部系统,并且发送 FlushSuccessEvent 向 SchemaRegistry 进行汇报。SchemaRegistry 据此统计响应的 Writer 数量。
  4. SchemaOperator 下发 SchemaChangeEvent 给所有的 Sink Operator,让 Sink Operator 更新对应表的序列化器。
  5. SchemaOperator 发送一个 ReleaseUpstreamRequest 给 SchemaRegistry,并且开始阻塞自身,不再处理任何变更事件,直到收到 SchemaRegistry 的回应。
  6. SchemaRegistry 接收到 FlushSuccessEvent 以后,会和第 1 步中注册的 Sink Operator 进行比较,如果所有的 Sink Operator 都已刷写完毕,则开始将第 2 步中受到的 SchemaChangeEvent 应用到外部系统中,并且对第 4 步接收到的 ReleaseUpstreamRequest 进行回应。这样,SchemaOperator 就可以开始继续传递新的数据变更时和表结构变更事件了。
2.4 Route

Route 模块提供了表名映射的能力。通过为每一个源表中的数据设置其写入的目标表,通过一对一以及多对一的映射配置,我们能够实现整库同步和简单的分库分表同步功能。

Route 模块基于 Flink 的 RichMapFunction 实现,允许通过 source-table 指定一个正则表达式规则,将一系列符合正则表达式规则的表名,替换到另外一个由 sink-table 指定的表名。RouteFunction 的核心代码如下:

public Event map(Event event) throws Exception {
    ChangeEvent changeEvent = (ChangeEvent) event;
    TableId tableId = changeEvent.tableId();

    for (Tuple2<Selectors, TableId> route : routes) {
        Selectors selectors = route.f0;
        TableId replaceBy = route.f1;
        if (selectors.isMatch(tableId)) {
            return recreateChangeEvent(changeEvent, replaceBy);
        }
    }
    return event;
}
2.5 Partition

在数据同步场景,数据的生产和消费的速率常常是不匹配的,用户希望能够通过增加 Sink 的并发度来提高数据处理的速率。Partition 模块负责分发事件到不同的 Sink 中。

在 Partition 阶段,数据变更事件按照表名和主键作为哈希键,保证同一张表中相同主键的数据不会因数据分发出现乱序的情况。哈希键的计算方式如下所示:

public Integer apply(DataChangeEvent event) {
    List<Object> objectsToHash = new ArrayList<>();
    // Table ID
    TableId tableId = event.tableId();
    Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
    Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
    objectsToHash.add(tableId.getTableName());

    // Primary key
    RecordData data =
            event.op().equals(OperationType.DELETE) ? event.before() : event.after();
    for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
        objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
    }

    // Calculate hash
    return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
}

同时由于 Sink 模块需要维护表结构信息,对于表结构变更事件,需要广播到每一个并发里。对于控制数据刷写的 FlushEvent,也需要广播到每一个下游的每一个通道里。

其代码实现如下:

public void processElement(StreamRecord<Event> element) throws Exception {
    Event event = element.getValue();
    if (event instanceof SchemaChangeEvent) {
        // Update hash function
        TableId tableId = ((SchemaChangeEvent) event).tableId();
        cachedHashFunctions.put(tableId, recreateHashFunction(tableId));
        // Broadcast SchemaChangeEvent
        broadcastEvent(event);
    } else if (event instanceof FlushEvent) {
        // Broadcast FlushEvent
        broadcastEvent(event);
    } else if (event instanceof DataChangeEvent) {
        // Partition DataChangeEvent by table ID and primary keys
        partitionBy(((DataChangeEvent) event));
    }
}
2.6 Sink

在 Sink 模块,需要将数据写出到外部系统中,并且将表结构变更应用到外部系统中。FlinkCDC 的 DataSink API 提供了 EventSinkProvider 和 MetaDataApplier 接口去完成这两件事情。

EventSinkProvider 用于将表数据变更应用到外部系统中。EventSinkProvider 要求提供一个基于 Flink SinkFunction 或者是 Flink Sink API 的实现,并且具备写出到多个表的能力。以 Flink Sink API 为例,SinkWriter 需要从 DataChangeEvent 中取出变更数据,并写出到对应的表中。当处理到 SchemaChangeEvent 时, SinkWriter 更新内存中保存的表结构信息。当处理到 FlushEvent 时, Sink Operator 会调用 SinkWriter 的 flush 方法将数据刷写出去。

MetaDataApplier 用于将表结构变更应用到外部系统中。在 SchemaRegistry 接受到所有的 Sink 算子处理完 FlushEvent 的通知后,由 SchemaRegistry 负责调用 MetaDataApplier 的 applySchemaChange方法去应用表结构变更事件。考虑到任务重启的情况,MetaDataApplier 需要支持对一个表结构变更事件幂等处理。

四、未来规划

Flink CDC 社区致力于持续深化数据同步与处理的全面性和灵活性。在 Flink CDC 3.0 里,针对数据集成场景定制了高效的数据格式和算子编排,实现了对表结构变更同步和整库同步的支持。基于未来的演进规划,社会将会着重关注完善 Transform 模块的功能,以满足用户对数据同步过程中的深度定制需求。计划在下一个大版本中,支持表结构动态调整,包括裁剪不必要的列、添加计算列等功能,以及提供数据过滤能力,让用户能够在同步过程中一站式完成复杂的数据转换任务。

此外,在连接器生态建设方面,社区正着手接入 Kafka、PostgreSQL 等业界主流的数据源,以及 Paimon、Iceberg 等先进的湖仓存储系统。进一步拓宽 Flink CDC 的上下游数据集成范围,推动上下游组件的深度融合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1511827.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web——HTML

一.HTML概述 超文本标记语言&#xff08;英语&#xff1a;HyperText Markup Language&#xff0c;简称&#xff1a;HTML&#xff09;是一种用于创建网页的标准标记语言。可以使用 HTML 来建立自己的 WEB 站点&#xff0c;HTML 运行在浏览器上&#xff0c;由浏览器来解析。 二.…

相机模型Omnidirectional Camera(全方位摄像机)

1. 背景 大多数商用相机都可以描述为针孔相机&#xff0c;通过透视投影进行建模。然而&#xff0c;有些投影系统的几何结构无法使用传统针孔模型来描述&#xff0c;因为成像设备引入了非常高的失真。其中一些系统就是全方位摄像机。 有几种方法可以制作全向相机。屈光照相机(D…

【Linux】yum及vim

目录 在Linux中安装软件&#xff08;以centos7为例&#xff09; Linux中的安装方式 安装的理解 什么是软件包&#xff1f; yum的一般使用流程 Linux开发工具 vi/vim 什么是vi/vim&#xff1f; vim的三大主要模式&#xff08;实际上还有很多模式&#xff0c;此篇不做介绍…

一款好用的AI工具——边界AICHAT(三)

目录 3.23、文档生成PPT演示3.24、AI文档翻译3.25、AI翻译3.26、论文模式3.27、文章批改3.28、文章纠正3.29、写作助手3.30、文言文翻译3.31、日报周报月报生成器3.32、OCR-DOC办公文档识别3.33、AI真人语音合成3.34、录音音频总结3.35、域方模型市场3.36、模型创建3.37、社区交…

焦点调制网络

摘要 https://arxiv.org/pdf/2203.11926.pdf 我们提出了焦点调制网络&#xff08;简称FocalNets&#xff09;&#xff0c;其中自注意力&#xff08;SA&#xff09;被焦点调制模块完全取代&#xff0c;用于在视觉中建模令牌交互。焦点调制包含三个组件&#xff1a;&#xff08;…

使用html+css制作一个发光立方体特效

使用htmlcss制作一个发光立方体特效 <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>Documen…

SpringController返回值和异常自动包装

今天遇到一个需求&#xff0c;在不改动原系统代码的情况下。将Controller的返回值和异常包装到一个统一的返回对象中去。 例如原系统的接口 public String myIp(ApiIgnore HttpServletRequest request);返回的只是一个IP字符串"0:0:0:0:0:0:0:1"&#xff0c;目前接口…

第五篇【传奇开心果系列】Python的自动化办公库技术点案例示例:深度解读Pandas在教育数据和研究数据处理领域的应用

传奇开心果博文系列 系列博文目录Python的自动化办公库技术点案例示例系列 博文目录前言一、Pandas 在教育和学术研究中的常见应用介绍二、数据清洗和预处理示例代码三、数据分析和统计示例代码四、数据可视化示例代码五、时间序列分析示例代码六、数据导入和导出示例代码七、数…

【数据挖掘】练习1:R入门

课后作业1&#xff1a;R入门 一&#xff1a;习题内容 1.要与R交互必须安装Rstudio&#xff0c;这种说法对不对&#xff1f; 不对。虽然RStudio是一个流行的R交互集成开发环境&#xff0c;但并不是与R交互的唯一方式。 与R交互可以采用以下几种方法&#xff1a; 使用R Conso…

Qt/C++音视频开发69-保存监控pcm音频数据到mp4文件/监控录像/录像存储和回放/264/265/aac/pcm等

一、前言 用ffmpeg做音视频保存到mp4文件&#xff0c;都会遇到一个问题&#xff0c;尤其是在视频监控行业&#xff0c;就是监控摄像头设置的音频是PCM/G711A/G711U&#xff0c;解码后对应的格式是pcm_s16be/pcm_alaw/pcm_mulaw&#xff0c;将这个原始的音频流保存到mp4文件是会…

【企业战略转型】某音响制造公司发展战略转型管理咨询项目纪实

案例&#xff1a;【客户评价】日本M汽车音响有限公司田总经理&#xff1a;受经济大环境的影响&#xff0c;我公司原有的依赖企业下订单的业务模式受到很大的影响&#xff0c;企业进入“不进则退”的重要转型阶段。当企业生存的关键因素&#xff0c;我们作为典型的OEM汽车音响代…

unity学习(57)——选择角色界面--删除角色2

1.客户端添加点击按钮所触发的事件&#xff0c;在selectMenu界面中增加myDelete函数&#xff0c;当点击“删除角色”按钮时触发该函数的内容。 public void myDelete() {string message nowPlayer.id;//string m Coding<StringDTO>.encode(message);NetWorkScript.get…

前端之用HTML做一个汇款单

例子 代码 里面注释是我我对运用到的知识的理解 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>工商银行电子汇款单</title> </head> <body><h3>工商银行电子汇款单</…

python疑难杂症(10)---Python函数def的定义分类,包括内置函数、外置函数、匿名函数、闭包函数、生成器函数等

本部分详细讲解Python函数的定义、常见的函数类型&#xff0c;尤其是特色函数包括内置函数、外置函数、匿名函数、闭包函数、生成器函数等以及用法。后续将对这类函数重点讲解使用方法。 函数定义&#xff1a; 函数是大多数编程语言使用的一个概念&#xff0c;函数是一段具有…

题目 2610: 第十二届省赛真题-杨辉三角形

题目描述: 下面的图形是著名的杨辉三角形&#xff1a; 如果我们按从上到下、从左到右的顺序把所有数排成一列&#xff0c;可以得到如下 数列&#xff1a; 1, 1, 1, 1, 2, 1, 1, 3, 3, 1, 1, 4, 6, 4, 1, ... 给定一个正整数 N&#xff0c;请你输出数列中第一次出现 N 是在第几…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的海洋动物检测系统(Python+PySide6界面+训练代码)

摘要&#xff1a;开发海洋动物检测系统对于海洋生态环境监控具有关键作用。本篇博客详细介绍了如何运用深度学习构建一个海洋动物检测系统&#xff0c;并提供了完整的实现代码。该系统基于强大的YOLOv8算法&#xff0c;并对比了YOLOv7、YOLOv6、YOLOv5&#xff0c;展示了不同模…

Shell常用脚本:hadoop集群启动、停止、重启脚本

脚本内容以我搭建的hadoop集群为例&#xff0c;你们自用的时候自行根据你们的情况进行修改即可 hadoop-cluster-manager.sh #!/bin/bash # 1. 调用此脚本前&#xff0c;请使用ssh-keygen -t rsa、ssh-copy-id -f 目标机器这两个命令使得目标机器是免密登录的 # 2. ssh远程执行…

在 Rust 中使用 Serde 处理json

在 Rust 中使用 Serde 处理json 在本文中&#xff0c;我们将讨论 Serde、如何在 Rust 应用程序中使用它以及一些更高级的提示和技巧。 什么是serde&#xff1f; Rust中的serde crate用于高效地序列化和反序列化多种格式的数据。它通过提供两个可以使用的traits来实现这一点&a…

【机器学习300问】33、决策树是如何进行特征选择的?

还记得我在【机器学习300问】的第28问里谈到的&#xff0c;看决策树的定义不就是if-else语句吗怎么被称为机器学习模型&#xff1f;其中最重要的两点就是决策树算法要能够自己回答下面两问题&#xff1a; 该选哪些特征 特征选择该选哪个阈值 阈值确定 今天这篇文章承接上文&…

因为manifest.json文件引起的 android-chrome-192x192.png 404 (Not Found)

H5项目打包之后&#xff0c;总是有这个报错&#xff0c;有时候还有别的icon也找不见 一通调查之后&#xff0c;发现是因为引入了一个vue插件 这个插件引入之后&#xff0c;webpack打包的时候就会自动在dist文件夹中产生一个manifest.json文件这个文件里面主要就是一些icon地址的…