【flink】SinkUpsertMaterializer

news2025/1/19 10:17:25

在flink cdc同步数据时,基于sql的实现方式中发现了作业DAG有个SinkMaterializer算子,而且检查checkpoint历史时发现该算子state越来越大,
有必要搞清楚为什么会多了这个算子,作用又是什么。

通过算子名称定位到了源码为类org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer,这个算子将输入的记录以upsert key作区分保存到state中,
并为下游算子提供一下upsert视图。

An operator that maintains incoming records in state corresponding to the upsert keys and generates an upsert view for the downstream operator.

单纯看类注释和代码逻辑,并不能理解它的用处及设计背景。

设计背景

SinkUpsertMaterializer是为了解决changelog流事件乱序造成了结果不正确的问题。

示例

在分布式环境中,join, aggregate等操作经常会触发数据shuffling,可能会将source端同一主键记录的changelog分散到不同的下游算子中处理,造成数据处理乱序。

-- CDC源表:   
event: event_id BIGINT, dim_id BIGINT, PRIMARY KEY(event_id)
dim: dim_id BIGINT, name VARCHAR, PRIMARY KEY(dim_id)

-- 结果表: 
result: event_id BIGINT, dim_id BIGINT, name VARCHAR, PRIMARY KEY(event_id)

INSERT INTO result SELECT  event.*,dim.name from event JOIN dim  ON event.dim_id = dim.dim_id

两源表数据如何,event表只有1条数据,其中dim_id的值由10更新为11,所以整个流中产生了3条changelog数据。dim表中有dim_id为10,11的两条数据,没发生过修改。

eventdim
(+I,event_id=1,dim_id=10)
(-U,event_id=1,dim_id=10)
(+U,event_id=1,dim_id=11)
(+I,dim_id=10,name=dim10)
(+I,dim_id=11,name=dim11)

当event表和dim表根据dim_id进行关联时,changelog数据将以dim_id为upsert keys进行shuffling,得到以下情况

在这里插入图片描述

由于sink接收的数据来自两个上游算子,由于网络或者是处理速度原因,sink最终接收到数据的顺序并不确定,唯一能确定的是+I会在-U之前,因为两者具有相同的dim_id(10),最终会被同一个join task顺序处理,
即最终sink的数据可能是以下几种可能:

情况一情况二情况三
(+I,event_id=1,dim_id=10,name=dim10)
(-U,event_id=1,dim_id=10,name=dim10)
(+U,event_id=1,dim_id=11,name=dim11)
(+I,event_id=1,dim_id=10,name=dim10)
(+U,event_id=1,dim_id=11,name=dim11)
(-U,event_id=1,dim_id=10,name=dim10)
(+U,event_id=1,dim_id=11,name=dim11)
(+I,event_id=1,dim_id=10,name=dim10)
(-U,event_id=1,dim_id=10,name=dim10)
  • 情况一:sink顺序接收了changelog,最终得到正确结果(event_id=1,dim_id=11,name=dim11)
  • 情况二:sink最后接收到-U,造成event_id=1记录被删除
  • 情况三:同情况二

SinkUpsertMaterializer

SinkUpsertMaterializer位于sink算子之前,它通过将上游乱序数据以**upsert keys分区缓存在state中,同时为下游的sink提供一个正确的upsert视图。

原理

根据代码逻辑梳理出以下流程图

在这里插入图片描述

SinkUpsertMaterializer处理逻辑:

  • 如果row是+|或+U,则保存到state中,如果state中不为空,表示该row是+U,否则是+I(前提是+I不能和+U发生乱序!!!)
  • 如果row是-D或-U,则需要从state中删除与之对应状态的记录,如果删除后state为空,表示该key已经被删除,发送-D到下游;如果删除的记录为state中最后一条,则表示倒数第二条为该key当前最新的状态,将它标记为+U发送到下游

单纯从代码逻辑很难理解,结合上述的示例,看看会得到什么效果。

效果

分析情况二和情况三两种情况在SinkUpsertMaterializer中会产生什么效果

  • 情况二
  1. 接收(+I,event_id=1,dim_id=10,name=dim10)并发送到下游,state=[(+I,event_id=1,dim_id=10,name=dim10)]
  2. 接收(+U,event_id=1,dim_id=11,name=dim11)并发送到下游,state=[(+I,event_id=1,dim_id=10,name=dim10),(+U,event_id=1,dim_id=11,name=dim11)]
  3. 接收(-U,event_id=1,dim_id=10,name=dim10),删除state中第一个元素同时抛弃,state=[(+U,event_id=1,dim_id=11,name=dim11)]

乱序的-U最终在SinkUpsertMaterializer中就被丢了,并不会发送到sink,而最后发到sink的是+U,最终状态与state保持一致。

  • 情况三
  1. 接收(+U,event_id=1,dim_id=11,name=dim11)并发送到下游,state=[(+I,event_id=1,dim_id=11,name=dim11)]
  2. 接收(+I,event_id=1,dim_id=10,name=dim10)并发送到下游,state=[(+I,event_id=1,dim_id=11,name=dim11),(+U,event_id=1,dim_id=10,name=dim10)]
  3. 接收(-U,event_id=1,dim_id=10,name=dim10),删除state中最后一个元素,取第倒数第二个元素改+U往下发送(+U,event_id=1,dim_id=11,name=dim11)

情况三中,+I与+U乱序,-U与+U乱序,但是最终sink接收到的最后都是+U,数据正确。

使用方式

table.exec.sink.upsert-materialize配置项用于控制该算子的使用

  • FORCE:强制使用,无论什么场景
  • NONE:任何场景都不使用
  • AUTO:根据执行计划自动推断是否需要开启,当输入算子数据存在更新且upsert keys不存在于sink表主键中时启用。上述示例中source为changelog,event_id为主键,dim_id为upsert keys,符合条件,所以开启。

推断的逻辑位于代码org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy

当开启后,需要考虑state持续增大的情况,ttl受table.exec.state.ttl控制

参考

https://blog.csdn.net/qq_32727095/article/details/129876631
https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/653747.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

常用API(String,ArrayList)

1:String类概述 String是字符串类型,可以定义字符串变量指向字符串对象String是不可变字符串的原因?1.String变量每次的修改都是产生并指向新的字符串对象。2.原来的字符串对象都是没有改变的,所以称不可变字符串。 2:String创建…

一文搞懂VOS费率前缀、地区前缀的区别和使用

登录VOS3000客户端 进入费率管理 "VOS费率前缀"和"地区前缀"的主要区别如下: VOS费率前缀:VOS(Voice Over Service)费率前缀是指用于国际长途电话呼叫的特定前缀号码。不同的运营商或服务提供商可能会使用不同的VOS费率前缀,用于标识国际长途通话的费…

49 最佳实践-性能最佳实践-Nvme磁盘直通

文章目录 49 最佳实践-性能最佳实践-Nvme磁盘直通49.1 概述49.2 操作指导 49 最佳实践-性能最佳实践-Nvme磁盘直通 49.1 概述 设备直通技术是一种基于硬件的虚拟化解决方案,通过该技术,虚拟机可以直接连接到指定的物理直通设备上。对于用户来说&#x…

ByteV联合“智农”打造数字孪生高标准农田,助力乡村振兴

ByteV联合“智农”打造的数字孪生高标准农田,不仅要让粮食稳产、增产,更要对土壤肥力进行改良和提升。不仅能够实现科技引领农业发展,更在智慧农业的基础上实现一站式托管,真正做到技术提升、5G引领、建后管护的闭环管理。让高标准…

C语言之指针详解(7)

目录 本章重点 1. 字符指针 2. 数组指针 3. 指针数组 4. 数组传参和指针传参 5. 函数指针 6. 函数指针数组 7. 指向函数指针数组的指针 8. 回调函数 9. 指针和数组面试题的解析 上一篇博客我们说过会把回调函数的一些知识再给大家讲一遍 这里把void*强制类型转化为str…

【Hadoop】 | 搭建HA之报错锦集

知识目录 一、写在前面✨二、Hadoop的active结点无法主备切换🔥三、Hadoop Web端无法上传文件🍉四、hdfs创建文件夹报错🍭五、IDEA操作Hdfs无法初始化集群🔥六、Java无法连接Hdfs🍭七、找不到Hadoop家目录&#x1f525…

软件测试实战案例:支付功能板块如何测试?详细总结

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 大体上&#xff0…

ChatGPT数据安全隐患?本想提高效率,数据却遭泄露

一项新的研究发现,15%的员工经常在ChatGPT上上传公司数据,其中超过四分之一的数据被认为是敏感信息,这使公司在无形中面临安全漏洞的风险。 6月的研究报告《揭示真正的GenAI数据暴露风险》分析了超过10000名员工,主要研究员工如何…

(ICIP-2022)GAITTAKE:通过时间注意和关键点引导嵌入进行步态识别

GAITTAKE:通过时间注意和关键点引导嵌入进行步态识别 论文题目:GAITTAKE: GAIT RECOGNITION BY TEMPORAL ATTENTION AND KEYPOINT-GUIDED EMBEDDING 论文是华盛顿大学发表在ICIP 2022的工作 论文地址 ABSTRACT 步态识别是指根据远距离采集的视频数据&am…

【服务器数据恢复】OneFS文件系统下误删除文件的数据恢复案例

EMC Isilon存储结构: Isilon群集存储系统使用的是分布式文件系统OneFS。Isilon群集存储系统的每个节点均为单一OneFS文件系统,Isilon在进行横向扩展时不会影响数据的正常使用。Isilon群集存储系统所有节点在工作时都提供相同的功能,节点没有主…

js数组高阶函数——map()方法

js数组高阶函数——map方法 map()语法map()的基本使用map()的优缺点map()的使用场景去重双重for循环配合splie去重map循环配合Array.from去重set()去重filter…

es中索引那些事

0、前言 在了解倒排索引之前先理解下索引的作用: 查询数据的时候,最耗时的操作并不是CPU计算,也不是内存聚合,而是去磁盘将文档查到并拉取回来的过程。我们都知道在磁盘IO的过程中,顺序读写效率高于随机读写&#xf…

JavaWeb学习路线(4)——请求响应与分层解耦

一、概述 二、请求 (一)概念: 全名为HttpServletRequest,其目标是获取请求数据。 (二)简单请求: web端发送基本数据类型数据到服务器进行处理。 1、获取方式 (1)原…

ESP32学习之JSON,和接入心知天气

注意:手机热点或者网络不能开5.0GHz频段和WIFI6,不然ESP32连不上 心知天气账号(免费版即可),网站:心知天气 - 高精度气象数据 - 天气数据API接口 - 行业气象解决方案 (seniverse.com) V3的用户手册-天气实…

【业务功能篇29】Assert断言

业务场景: 当我们需要对一个接口方法验证是,在单元测试中,主要用于程序代码的调试或测试阶段 基本的使用就是assert condition,当 condition 为 true,就继续往下运行;当 condition 为 false,就抛…

MySQL - 第2节 - MySQL库的操作

1.创建数据库 创建数据库的SQL如下: CREATE DATABASE [IF NOT EXISTS] db_name [[DEFAULT] CHARSETcharset_name] [[DEFAULT] COLLATEcollation_name];说明: • SQL中大写的表示关键字,[ ]中代表的是可选项。 • CHARSET用于指定数据库所采用…

「实在RPA·证券数字员工」革新证券数字化现状

2022年1月《金融科技发展规划(2022——2025年)》提出“十四五”时期金融科技发展愿景,明确了金融科技发展的指导思想和4个基本原则、6个发展目标,确定了8项重点任务和5项保障措施,进一步明确金融科技发展方向。近年来&…

ADB WIFI 链接

ADB WiFi链接手机 必须在同一网络下(本人用的台式机网线手机连路由器WIFI) 1.先确认USB数据线是否成功链接了手机 adb devices不管前面设备是什么名字,但是后面必须为device状态才算链接成功了,offline状态是不行的 有些没开启…

Linux Debian Jenkins快速搭建配置并运行

Jenkins安装 参考Debian Jenkins Packageshttps://pkg.origin.jenkins.io/debian-stable/ 加Key curl -fsSL https://pkg.jenkins.io/debian-stable/jenkins.io-2023.key | sudo tee \/usr/share/keyrings/jenkins-keyring.asc > /dev/null 加仓库 echo deb [signed-by/u…

什么是远程工具,远程工具推荐

在当今数字化时代,远程工作正在变得越来越普遍。这种趋势不仅使企业管理更加便利,节省了时间和资源,同时也使员工更加自由和灵活。许多远程工作都需要使用到远程工具。本文将对远程工具进行简介和阐述。 什么是远程工具 远程工具是一种数字…