Flink cdc3.0动态变更表结构——源码解析

news2025/1/15 21:40:18

文章目录

  • 前言
  • 源码解析
    • 1. 接收schema变更事件
    • 2. 发起schema变更请求
    • 3. schema变更请求具体处理
    • 4. 广播刷新事件并阻塞
    • 5. 处理FlushEvent
    • 6. 修改sink端schema
  • 结尾

前言

上一篇Flink cdc3.0同步实例 介绍了最新的一些功能和问题,本篇来看下新功能之一的动态变更表结构的具体实现。
在 Flink 中,应用程序由流数据流组成,这些数据流是由用户定义的Operators进行转换。
在这里插入图片描述
Flink CDC 3.0 框架中流动的数据类型被称为Event,代表外部系统产生的变更事件。每个事件都标有发生更改的表 ID 。事件分为SchemaChangeEventDataChangeEvent,分别代表表结构和数据的变化。处理schema变更的Operators对应图中的SchemaOperator
在这里插入图片描述
(以下代码使用Flink Release 3.0.0)

源码解析

1. 接收schema变更事件

我们以添加字段触发的AddColumnEvent为例,它实现了SchemaChangeEventSchemaOperator 当接收到有AddColumnEvent 事件时,会在processElement 中调用handleSchemaChangeEvent处理。
在这里插入图片描述

2. 发起schema变更请求

说明下这里的response实际是直接返回的new SchemaChangeResponse(true), 由于构造的shouldSendFlushEvent 直接传入true, 所以后续也会进入if条件。我们接着requestSchemaChange 方法看
在这里插入图片描述
由于知道response是直接创建的已知结果,因此responseFuture.get() 也不会阻塞。我们接着来看toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue<>(request));的实现
在这里插入图片描述

3. schema变更请求具体处理

通过几层的调用,上述变更请求会走到 SchemaRegistryhandleCoordinationRequest(CoordinationRequest request),我们的请求是SchemaChangeRequest,所以会调用requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
在这里插入图片描述
这里可以看到response 是直接创建的SchemaChangeResponse(true)。 接着schemaManager.applySchemaChange(request.getSchemaChangeEvent());注册新的schema。

在这里插入图片描述
另外还有个重点,在startToWaitForReleaseRequest方法中会重置responseFuture, 原本的response通过return返回了。而PendingSchemaChange中的response重置,主要就是为了等schema变更完成设计。(主线程会再次发起请求调用responseFuture.get() ,忽略这里会不理解后面为什么会阻塞)
在这里插入图片描述

4. 广播刷新事件并阻塞

回到第二部分,因为response是一个明确对象没有阻塞,返回后会直接广播FlushEventschemaChangeEvent(再次发起schemaChangeEvent不是很理解)。之后requestReleaseUpstream 请求调用responseFuture.get()会阻塞,因为response在第三步已经重置为new CompletableFuture<>(), 利用的1.8的特性。这也是收到变更事件后要保证sink端变更才能发放数据。
在这里插入图片描述

5. 处理FlushEvent

FlushEvent 由什么Operator处理,在官方架构图中其实没有指出,但是图标可以看出是通过sink端完成,我们可以找到DataSinkWriterOperator类,有对FlushEvent的处理。
在这里插入图片描述
实际调用SchemaRegistry::handleEventFromOperator方法,重点在requestHandler.flushSuccess(flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
在这里插入图片描述
其中applySchemaChange 就是在具体的sink端变更,下面会展开。 当变更完成后会执行waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));,实际就通知第4部分的response这里处理完了,可以正常放开数据流。
在这里插入图片描述

6. 修改sink端schema

每个sink端有自定义的metadataApplier
在这里插入图片描述
我们以DorisMetadataApplier为例,applyAddColumnEvent 会构造addFieldSchema,然后在schemaChangeManager 中转换为对应的sql执行。
在这里插入图片描述

结尾

以上就是这两天对源码跟进的记录,后续思考使用local环境Debug中间过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1435196.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【华为】GRE Over IPsec 实验配置

【华为】GRE Over IPsec 实验配置 前言报文格式 实验需求配置拓扑GRE配置步骤IPsec 配置步骤R1基础配置GRE 配置IPsec 配置 ISP_R2基础配置 R3基础配置GRE 配置IPsec 配置 PCPC1PC2 抓包检查OSPF建立GRE隧道建立IPsec 隧道建立Ping 配置文档 前言 GRE over IPSec可利用GRE和IP…

什么是MVVM模型

MVVM&#xff08;Model-View-ViewModel&#xff09;是一种用于构建 Web 前端应用程序的架构模式。它是从传统的 MVC&#xff08;Model-View-Controller&#xff09;模型演变而来&#xff0c;旨在解决界面逻辑与业务逻辑之间的耦合问题。 在传统的 MVC 架构中&#xff0c;View …

SQL 表信息 | 统计 | 脚本

介绍 统计多个 SQL Server 实例上多个数据库的表大小、最后修改时间和行数&#xff0c;可以使用以下的 SQL 查询来获取这些信息。 脚本 示例脚本&#xff1a; DECLARE Query NVARCHAR(MAX)-- 创建一个临时表用于存储结果 CREATE TABLE #TableSizes (DatabaseName NVARCHAR…

小白水平理解面试经典题目LeetCode 20. Valid Parentheses【栈】

20.有效括号 小白渣翻译 给定一个仅包含字符 ‘(’ 、 ‘)’ 、 ‘{’ 、 ‘}’ 、 ‘[’ 和 ‘]’ &#xff0c;判断输入字符串是否有效。 输入字符串在以下情况下有效&#xff1a; 左括号必须由相同类型的括号封闭。 左括号必须按正确的顺序关闭。 每个右括号都有一个对…

error getting ip from ipam: operation get is not supported on blockkey

无论是否通过注释指定ip&#xff0c;都不支持cni Claim操作。 查了好久。发现是版本问题&#xff0c;我的calico版本太老了。是3.5的calico &#xff0c;使用 kubernetes 数据存储时&#xff0c;不支持 Calico IPAM。 需要更新calico到3.6以上&#xff0c;支持 kubernetes 数…

【QT】opcuaServer 的构建

【QT】opcuaServer 的构建 前言opcuaServer实现测试 前言 在博文【opcua】从编译文件到客户端的收发、断连、节点查询等实现 中&#xff0c;我们已经介绍了如何在QT 中创建opucaClient 。在本期的博文中&#xff0c;我们基于之前的部署环境&#xff0c;介绍一下如何构建opcuaS…

【DDD】学习笔记-数据实现模型

SQL 与存储过程 倘若选择关系型数据库&#xff0c;组成数据实现模型的主力军是 SQL 语句&#xff0c;这是我们不得不面对的现实。毕竟&#xff0c;针对数据建模的实现者大多数担任 DBA 角色&#xff0c;他&#xff08;她&#xff09;们掌握的操作数据的利器就是 SQL。正如前面…

【算法与数据结构】583、72、LeetCode两个字符串的删除操作+编辑距离

文章目录 一、583、两个字符串的删除操作二、72、编辑距离三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、583、两个字符串的删除操作 思路分析&#xff1a;本题的思路和115、不同的子序列差不多&#xff0c;只是变成…

【RT-DETR有效改进】可视化热力图 | 支持自定义模型、置信度选择等功能(论文必备)

👑欢迎大家订阅本专栏,一起学习RT-DETR👑 一、本文介绍 本文给大家带来的机制是的是RT-DETR可视化热力图功能,热力图作为我们论文当中的必备一环,可以展示出我们呈现机制的有效性,同时支持视频讲解,本文的内容是根据检测头的输出内容,然后来绘图。 在开始之前…

本地部署TeamCity打包发布GitLab管理的.NET Framework 4.5.2的web项目

本地部署TeamCity 本地部署TeamCity打包发布GitLab管理的.NET Framework 4.5.2的web项目部署环境配置 TeamCity 服务器 URLTeamCity 上 GitLab 的相关配置GitLab 链接配置SSH 配置项目构建配置创建项目配置构建步骤构建触发器结语本地部署TeamCity打包发布GitLab管理的.NET Fra…

【BIAI】Lecture 13 - Language processing

Language processing 专业术语 Aphasia 失语症 fMRI 功能性磁共振成像 auditory cortex 听觉皮层 motor cortex 运动皮层 primary visual cortex 初级视觉皮层 permotor cortex 前运动皮层 课程概要 What is language 语言是一种用词汇按照语法规则组合来表示和交流信息的系统…

【教学类-46-05】吉祥字门贴5.0(华光彩云_CNKI 文本框 空心字涂色 ,繁简都可以,建议简体)

作品展示 背景需求&#xff1a; 1、制作了空心字的第1款 华光通心圆_CNKI &#xff0c;发现它不能识别某些简体字&#xff0c;但可以识别他们的繁体字&#xff08;繁体为准&#xff09; 【教学类-46-01】吉祥字门贴1.0&#xff08;华光通心圆_CNKI 文本框 空心字涂色&#xf…

nginx+flask+Gunicorn反代理服务拿不到真实IP的解决

背景 本人在宝塔linux环境&#xff0c;要部署flask的简单后端并且用Ngnix反代理&#xff0c;用Gunicorn框架部署。&#xff08;o(╥﹏╥)o中间磕磕绊绊总算部署上去了&#xff0c;需要了解Gunicorn怎么部署的朋友&#xff0c;评论区留言&#xff0c;我加补一篇介绍&#xff09;…

CentOS 8 安装配置 Hadoop3.3.6 伪分布式安装方式(适用于开发和调试)

1.配置服务器ssh免密登录&#xff0c;否则后面启动会报错&#xff1a;尝试通过SSH连接到主机出现认证错误的提示 配置服务器ssh免密登录&#xff1a; 1.生成SSH密钥对&#xff08;如果尚未生成&#xff09;&#xff1a; 执行下面的命令生成密钥对&#xff0c;一直回车即可 ssh…

爬虫实战--爬取简单文字图片并保存到mongodb数据库

文章目录 前言发现宝藏 前言 为了巩固所学的知识&#xff0c;作者尝试着开始发布一些学习笔记类的博客&#xff0c;方便日后回顾。当然&#xff0c;如果能帮到一些萌新进行新技术的学习那也是极好的。作者菜菜一枚&#xff0c;文章中如果有记录错误&#xff0c;欢迎读者朋友们…

七、类与对象

文章目录 类与对象1.1 自定义类1.2 第一个类1.3 private变量1.4 变量默认值1.5 构造方法1.6 类和对象的生命周期 类与对象 本文为书籍《Java编程的逻辑》1和《剑指Java&#xff1a;核心原理与应用实践》2阅读笔记 将客观世界中存在的一切可以描述的事物称为对象&#xff08;实…

浏览器提示ERR_SSL_KEY_USAGE_INCOMPATIBLE解决

ERR_SSL_KEY_USAGE_INCOMPATIBLE报错原因 ERR_SSL_KEY_USAGE_INCOMPATIBLE 错误通常发生在使用 SSL/TLS 连接时,指的是客户端和服务器之间进行安全通信尝试失败,原因是证书中的密钥用途(Key Usage)或扩展密钥用途(Extended Key Usage, EKU)与正在尝试的操作不兼容。这意味…

性能评测|虚拟化和裸金属 K8s 哪个性能更好?

本文重点 整体而言&#xff0c;SKS&#xff08;虚拟机 Kubernetes&#xff09;可以达到裸金属 Kubernetes 性能的 82% – 96%&#xff0c;满足绝大部分场景下生产容器应用的性能需求。更多虚拟化与裸金属 Kubernetes 架构、特性、适用场景与性能对比&#xff0c;欢迎阅读文末电…

【算法】枚举——蓝桥杯、日期统计、特殊日期(位数之和)、2023、特殊日期(倍数)、跑步锻炼

文章目录 蓝桥杯日期统计特殊日期&#xff08;位数之和&#xff09;2023特殊日期&#xff08;倍数&#xff09;跑步锻炼 蓝桥杯 日期统计 日期统计 如果暴力枚举100个数的八次循环那就是1016次运算&#xff0c;时间复杂度太高了&#xff0c;好在前四次的2023是确定的&#xf…

Mybatis中的sql-xml延迟加载机制

Mybatis中的sql-xml延迟加载机制 hi&#xff0c;我是阿昌&#xff0c;今天记录一下关于Mybatis中的sql-xml延迟加载机制 一、前言 首先mybatis技术本身就不多介绍&#xff0c;说延迟加载机制之前&#xff0c;那要先知道2个概念&#xff1a; 主查询对象关联对象 假设咱们现…