Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题

news2025/2/24 18:05:13
《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作(创建/更新/删除)的时间,可以说是天然的“事件时间”,特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下,优先使用 CDC 中的这个时间字段,这个时间更加精准。

与此同时,在定义 Hudi 表时,precombine.field 也是一个非常重要的配置,显然 CDC 数据中的记录变更时间是最合适的,没有之一。

CDC 数据中的记录变更时间属于元数据范畴,以 Flink CDC 的 MySQL 数据库为例,它提供四种元数据的抽取:

KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0.
row_kindSTRING NOT NULLIt indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘+I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘+U’ means UPDATE_AFTER message.

其中的 op_ts 就是我们想要的,也就是:CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列,Flink CDC 可以将其提取出来作为普通字段供下游使用,就像下表中这样:

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
    `order_number` INT NOT NULL,
    `order_date` DATE NOT NULL,
    `purchaser` INT NOT NULL,
    `quantity` INT NOT NULL,
    `product_id` INT NOT NULL,
    `op_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    ...
);

注意,在定义 Flink CDC 源表时,op_ts 的数据类型是 TIMESTAMP_LTZ(3),不是 TIMESTAMP(3),写入下游表时,可以是 TIMESTAMP(3)

当我们初次使用这个 op_ts 字段时,你会发现,写入到的数据库的数据全部都是 1970-01-01 00:00:00.000,就像下面这样:

在这里插入图片描述

你可能会认为是哪里出错了,实际上,这是 Flink CDC 特别设计的,也是合理的,Flink CDC 官方文档的解释是:

If the record is read from snapshot of the table instead of the binlog, the value is always 0.

我们知道,Flink CDC ( 2.0+ ) 的一个显著特征是:它是全量 + 增量的一体化读取!全量就是经常说的历史数据,增量就是实时的数据,控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是:scan.startup.mode ( 官方文档 ),该配置项支持 5 种配置,而默认配置(initial)就是以当前分界点,数据中的现有数据使用全量方式读取(也叫快照读取),此后的数据从 binlog 中读取,这样就和上面描述的 op_ts 字段的取值吻合上了:

当 Flink CDC 使用全量方式读取表中的历史数据时,op_ts 字段全部取值为 0,即 1970-01-01 00:00:00.000,当 Flink CDC 使用增量方式读取 binlog 数据时,op_ts 字段的取值为数据发生变更的实际时间

这种设计还是非常合理的,因为,Flink CDC 本身在使用快照方式读取时,就没有任何变更时间可以读取,这个时间只在 binlog 中才有,而这对下游也不会造成太大的影响,因为此时的数据都是 insert-only 的数据,同一主键也不会出现两条记录,至少对 Hudi 表是没有影响的。

此外,作为一个“额外收获”,你会发现:op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的,还是增量同步进来的!


补充:以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释:

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
  • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1474788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构:树/二叉树

一、树的概念 逻辑结构:层次结构,一对多 节点:树中的一个数据元素根节点:树中的第一个节点,没有父节点孩子节点:该节点的直接下级节点父(亲)节点:该结点的直接上级节点兄弟节点:有…

代码随想录算法训练营第44天|● 完全背包 ● 518. 零钱兑换 II ● 377. 组合总和 Ⅳ

文章目录 ● 完全背包卡码网:52. 携带研究材料-完全背包理论练习代码: ● 518. 零钱兑换 II思路:五部曲 代码:滚动数组代码二:二维数组 ● 377. 组合总和 Ⅳ思路:五部曲 代码: ● 完全背包 卡码…

第十二篇【传奇开心果系列】Python文本和语音相互转换库技术点案例示例:深度解读SpeechRecognition语音转文本

传奇开心果系列 系列博文目录Python的文本和语音相互转换库技术点案例示例系列 博文目录前言一、SpeechRecognition语音转文本一般的操作步骤和示例代码二、SpeechRecognition 语音转文本的优势和特点三、易用性深度解读和示例代码四、多引擎支持深度解读和示例代码五、灵活性示…

windows系统使用Vscode在WSL调试golang本地进程

背景: windows10企业版 vscodegolang1.20 wsl编译运行。 vscode 使用本地wsl进行进程attach操作,发现:Access is denied. 本地进程启动,vscode调试进程。windows-Linux控制台: Starting: C:\Users\book\go\bin\dlv.exe dap --l…

express+mysql+vue,从零搭建一个商城管理系统5--用户注册

提示:学习express,搭建管理系统 文章目录 前言一、新建user表二、安装bcryptjs、MD5、body-parser三、修改config/db.js四、新建config/bcrypt.js五、新建models文件夹和models/user.js五、index.js引入使用body-parser六、修改routes/user.js七、启动项…

vscode不能远程连接ubuntu18.04.6

目录 问题解决Portable Mode 安装vscode 补充说明学习资料 问题 vscode远程ssh连接ubuntu18.04.6时,出现如下提示框,单击Learn More后,定位到问题。Can I run VS Code Server on older Linux distributions? 原始是:需要glibc …

LeetCode 热题 100 | 图论(上)

目录 1 200. 岛屿数量 2 994. 腐烂的橘子 2.1 智障遍历法 2.2 仿层序遍历法 菜鸟做题,语言是 C 1 200. 岛屿数量 解题思路: 遍历二维数组,寻找 “1”(若找到则岛屿数量 1)寻找与当前 “1” 直接或间接连接在…

未来新质生产力Agent的起源与应用

Agent是什么? AI Agent的发展经历了从哲学思想启蒙到计算机科学助力、专家系统兴起、机器学习崛起、深度学习突破等多个阶段。如今,AI Agent已经成为人工智能领域的重要组成部分,为人类带来了巨大的便利和发展机遇。早在古希腊时期&#xff0…

《opencv实用探索·二十二》支持向量机SVM用法

1、概述 在了解支持向量机SVM用法之前先了解一些概念: (1)线性可分和线性不可分 如果在一个二维空间有一堆样本,如下图所示,如果能找到一条线把这两类样本分开至线的两侧,那么这个样本集就是线性可分&#…

关于年化收益率的思考

近期,对于投资的年化收益率有一些思考,想着将这些思考整理一下,顺便也就记录在这里。 1. 计算方式 年化收益率常见的计算有三种:算数平均,几何平均,IRR。 1.1 算术平均 算数平均用于度量产品的回报率&a…

【Java EE初阶二十六】简单的表白墙(二)

2. 后端服务器部分 2.1 服务器分析 2.2 代码编写 2.2.2 前端发起一个ajax请求 2.2.3 服务器读取上述请求,并计算出响应 服务器需要使用 jackson 读取到前端这里的数据,并且进行解析: 代码运行图: 2.2.4 回到前端代码,处理服务器返回的响应…

vue项目从后端下载文件显示进度条或者loading

//API接口 export const exportDownload (params?: Object, peCallback?: Function) > {return new Promise((resolve, reject) > {axios({method: get,url: ,headers: {access_token: ${getToken()},},responseType: blob,params,onDownloadProgress: (pe) > {peC…

Flutter(三):Stack、Positioned、屏幕相关尺寸、Navigator路由跳转

页面尺寸 通知栏高度:MediaQuery.of(context).padding.top顶部导航高度:kToolbarHeight底部导航高度:kBottomNavigationBarHeight屏幕宽:MediaQuery.of(context).size.width屏幕高:MediaQuery.of(context).size.height…

SpringMVC 学习(十)之异常处理

目录 1 异常处理介绍 2 通过 SimpleMappingExceptionResolver 实现 3 通过接口 HandlerExceptionResolver 实现 4 通过 ExceptionHandler 注解实现(推荐) 1 异常处理介绍 在 SpringMVC中,异常处理器(Exceptio…

项目解决方案:海外门店视频汇聚方案(全球性的连锁店、国外连锁店视频接入和汇聚方案)

目 录 一、概述 二、建设目标及需求 2.1 建设目标 2.2 需求描述 2.3 需求分析 三、建设方案设计 3.1 系统方案拓扑图 3.2 方案描述 3.3 服务器配置推荐 四、产品功能 4.1 资源管理平台 (1)用户权限管理 (2&#xff09…

AD9226 65M采样 模数转换

目录 AD9220_ReadTEST AD9220_ReadModule AD9226_TEST_tb 自己再写个 260M的时钟,四分频来提供65M的时钟。 用 vivado 写的 AD9226_ReadTEST module AD9226_ReadTEST( input clk, input rstn,output clk_driver, //模块时钟管脚 input [12:0]IO_data, //模块数…

LACP——链路聚合控制协议

LACP——链路聚合控制协议 什么是LACP? LACP(Link Aggregation Control Protocol,链路聚合控制协议)是一种基于IEEE802.3ad标准的实现链路动态聚合与解聚合的协议,它是链路聚合中常用的一种协议。 链路聚合组中启用了…

Linux运维-Web服务器的配置与管理(Apache+tomcat)(没成功,最后有失败经验)

Web服务器的配置与管理(Apachetomcat) 项目场景 公司业务经过长期发展,有了很大突破,已经实现盈利,现公司要求加强技术架构应用功能和安全性以及开始向企业应用、移动APP等领域延伸,此时原来开发web服务的php语言已经不适应新的…

云服务器ECS价格表出炉_2024年最新价格表——阿里云

2024年最新阿里云服务器租用费用优惠价格表,轻量2核2G3M带宽轻量服务器一年61元,折合5元1个月,新老用户同享99元一年服务器,2核4G5M服务器ECS优惠价199元一年,2核4G4M轻量服务器165元一年,2核4G服务器30元3…

高防IP简介

高防IP可以防御的有包括但不限于以下类型: SYN Flood、UDP Flood、ICMP Flood、IGMP Flood、ACK Flood、Ping Sweep 等攻击。高防IP专注于解决云外业务遭受大流量DDoS攻击的防护服务。支持网站和非网站类业务的DDoS、CC防护,用户通过配置转发规则&#x…