基于 Flink Paimon 实现 Streaming Warehouse 数据一致性管理

news2024/11/13 23:33:30

摘要:本文整理自字节跳动基础架构工程师李明,在 Apache Paimon Meetup 的分享。本篇内容主要分为四个部分:

  1. 背景

  2. 方案设计

  3. 当前进展

  4. 未来规划

点击查看原文视频 & 演讲PPT

一、背景

1

早期的数仓生产体系主要以离线数仓为主,业务按照自己的业务需求将数仓分为不同的层次,例如 DWD、DWS、ADS 等。在离线数仓中,业务数据会经过离线 ETL 加工进入数仓,层与层之间的数据转换也会使用离线 ETL 来进行处理。ADS 层可以直接对外提供 Serving 能力,中间层通常会使用 Hive 来存储中间数据。基于 Hive 也可以提供一些 OLAP QUERY 的能力。

在离线数仓生产体系下,优势是离线数仓的生产体系非常完整,工具链也比较成熟,存储和维护的成本比较低,对于用户的开发门槛相对也比较低。但劣势也非常明显,首先数据新鲜度非常低,通常是 T+1 级别,一般是小时级,甚至是天级。其次 changelog 支持不完善,虽然是面向Table开发,但中间存储 Hive 主要支持 append 类型的数据,同时离线 ETL 更适合处理全量数据,而不是增量更新。

2

随着数据量的增多,离线 ETL 的执行时间越来越长,同时业务对数据新鲜度的要求也越来越高。业务迫切的需要一种新的低延迟数仓生产体系。因此基于离线数仓进一步演进出了实时数仓生产体系。

比较典型的是 Lambda 架构的实时数仓生产体系。在 Lambda 架构的实时数仓生产体系中,业务需要维护两条链路,将生产链路分为了流处理层和批处理层。流处理层主要用于实时处理增量数据,作为批处理层的加速层,这层通常会选用 Storm、Flink 等实时计算引擎来进行数据处理。而中间结果则采用 Kafka 进行存储,以提供低延迟的流式消费能力。

批处理层和离线数仓相同,完成 T+1 的数据结果产出。服务层则会综合流处理层和批处理层的结果对外提供服务。

随着流式计算引擎的不断发展,以 Flink 为例,已经实现了计算层的流批统一,在一些场景中可以完全移除掉批处理层,由流处理层来完成全量+增量的计算。为了提供中间关键数据的 OLAP 查询能力,仍然需要将 Kafka 的数据再 Dump 到 Hive 中一份。

在实时数仓生产体系中,优势是数据新鲜度非常高,同时基于流处理层也可以做很多的预计算,来降低查询的延迟。

劣势也比较明显:

  • 第一,数仓的维护人员需要维护从计算到存储的两条技术栈完全不同的链路,开发和维护的成本都比较高。
  • 第二,存储成本高。Kafka 为了提供低延迟的流式消费能力,相比于离线常用的 HDFS,S3 等离线存储,存储的成本会更高。同时,为了让中间数据能够提供离线查询的能力,还需要额外存储一份离线的全量数据。
  • 第三,离线和实时链路的数据口径比较难对齐。这是因为采用了完全不同的两套技术栈在构建流处理层和批处理层。虽然逻辑抽象是相同的,但在具体实现上仍然有差别。并且流处理层的数据在不断地进行增量处理,和离线处理层很难基于固定的时间点进行结果对齐。
  • 最后在流处理链路中的中间结果,它是不可以被查询的,因为 Kafka 只支持流式顺序消费,没有点查、batch 查询的能力。虽然可以通过将 Kafka 数据 Dump 到 Hive 中一份,但实时性比较差。

3

尽管计算引擎已经实现了流批统一,但实时数仓其他的痛点很大程度是由于存储功能存在一定的限制而导致的。随着数据湖技术的兴起,一种新的存储技术产生了,它能支持高效的数据流批读写、数据回溯以及数据更新。基于数据湖可以构建出新的数仓生产体系——Streaming Warehouse。

在 Streaming Warehouse 中,每个中间表都被抽象为 Dynamic Table,能够同时支持流式和批式访问,为用户提供了和离线数仓相同的生产体验。基于 Streaming Warehouse 可以带来以下收益。

首先,为用户提供了统一的Table抽象,用户只需要维护一套 Schema。同时也统一了技术栈,大幅降低了业务的开发和运维成本。

其次,它采用了流批一体的存储,支持流式消费和 OLAP 查询,可以随时查询实时计算的中间结果。

最后,在保证数据新鲜度的情况下,存储成本相比实时数仓会更低一些。中间存储可以选用相对廉价的 HDFS 和 S3 这样的存储。

4

接下来我们对这三种数仓生产体系做一个整体的对比。

  • 在数据新鲜度方面,实时数仓和 Streaming Warehouse 的数据新鲜度是比较接近的,都是近似于实时的生产体验。

  • 在查询延迟方面,三种数仓生产体系的查询延迟都相对较低,但实时数仓的中间结果查询需要付出更多的成本,比如将中间结果需要导出到Hive等。

  • 在开发成本方面,Streaming Warehouse 和离线数仓的开发成本比较接近,它们的开发模式类似,可以很容易的进行开发和数据验证,门槛较低。实时数仓由于中间结果不可查,想要做 debug 和数据验证的成本开销会比较高。

  • 在运维成本方面,Streaming Warehouse 和离线数仓的运维成本也是比较接近的,因为它们的生产体系类似。对于运维人员,只需要维护一条链路,使用同一套技术栈。同时 Streaming Warehouse 和离线数仓都可以选择更廉价的离线存储,存储成本会更低一些。

5

那么思考一下 Streaming Warehouse 是否真的完全覆盖了我们的需求?

先来看一个业务场景,这是一个比较典型的商品订单关联计算的业务场景。在这个场景中,订单数据和商品数据会经过一些简单的加工,导入到 Streaming Warehouse 中的 ODS 层的表,也就是订单表和商品表。

然后订单表和商品表会进一步拼接为 DWD 层的商品订单明细表。最后对 DWD 层的表做一些聚合计算,产生 DWS 层的数据结果表。例如统计今天所有商品的营收,统计今天销售量 Top 10 的商品信息等。

在这样一个业务场景中,业务在数仓中可能也会进行一些常见的操作,比如业务可能会去修改订单表的字段。那么如果修改了订单表的字段,怎么去判断这次修改可能会影响到下游的哪些表呢?这反映出目前 Streaming Warehouse 中缺乏一个血缘管理的业务能力。

另外如果订单表数据出错了,如何去做生产链路的数据订正呢?在离线数仓中,可以很方便的进行任务重跑、Overwrite 等操作。在 Streaming Warehouse 中目前也可以很方便的去做这样的操作吗?

由于 Streaming Warehouse 是基于实时生产链路,所以不仅需要对这个表进行订正,还需要对它下游的表同时进行处理。在整个订正的过程中,数据的中间变化不应该被服务层可见。比如聚合结果已经到了 10,在订正的过程中,这个结果可能会回退到1,然后再逐渐累加到 10。

除了上述两个问题外,在进行 OLAP 查询时,如果想要分析 Top 10 商品在整个营收中所占的比重如何进行呢?如果是离线数仓,我们可以在两个表就绪之后进行 batch 查询。而在 Streaming Warehouse 中并没有就绪的概念,这两张表又来源于两个不同的任务,任务之间并没有任何的数据对齐的操作。当我们进行多表关联查询的时候,它的计算结果并不是完全一致的,缺少一个一致性的保证。

6

下面我们来总结一下在 Streaming Warehouse 中存在的问题。

  • 缺少血缘管理功能,包括表的血缘关系以及数据的血缘关系。表血缘关系是指这个表的上下游依赖,而数据血缘关系则是指这份数据来源于上游的哪些数据,同时下游基于这份数据生产出了哪些数据。

  • 缺少统一的版本管理能力。在离线数仓中,我们可以按照小时、天来对数据进行对齐。而在 Streaming Warehouse 中,由于我们都是流式进行处理,没有数据对齐、版本划分的概念,就会导致进行多表关联查询的时候缺少一致性的保证。

  • 数据订正困难。在进行订正的过程中,需要进行链路双跑、业务逻辑修正等大量的人工操作,运维成本较高。

基于以上的问题,我们提出了一个基于 Flink 和 Paimon 构建 Streaming Warehouse,并对外提供数据一致性管理的能力。

二、方案设计

7

下面我们介绍一下基于 Flink 和 Paimon 实现数据一致性管理方案的详细设计。

在一致性管理方案的整体设计中,主要包含两个部分。

  • 第一部分,建立上下游的血缘关系,我们会引入 System Database 来记录 Streaming Warehouse 中所有表和数据的血缘关系。同时,在任务提交以及数据生产的过程中,会自动的把表以及数据之间的血缘关系写入到血缘关系表中。

  • 第二部分,我们会在 Streaming Warehouse 中引入数据版本控制的能力,数据会按照版本来保持可见性,并且协调多表数据版本处理的一致性。

8

下面我们详细介绍一下这两部分的方案设计。

首先是血缘关系中的Table血缘关系管理。我们在 Streaming Warehouse 中引入了 System Database,并在这个 System Database 中创建了 Source 和 Sink 的血缘关系表。在任务的提交阶段,会解析这个任务使用到的 Table 表,并将这些信息记录到 Paimon 的血缘关系表中。

上图是我们的一个表结构,主要用来记录表和任务之间的关联关系。基于这个关联关系,我们可以构建出表与表之间的数据血缘关系。

9

在数据血缘关系中会为数据划分一个版本,并将版本信息记录到数据血缘关系的表中。目前我们以 Flink 的 Checkpoint 作为数据版本的一个划分标志,这是因为在 Flink 中目前 Paimon 表是依赖 Checkpoint 来实现数据提交的。

在 Flink 的 Checkpoint 制作成功之后,这意味着一个新的版本的数据产生了,我们会自动记录消费与生产之间的 Snapshot 的关系。

10

接下来介绍数据版本控制的设计,首先介绍一下基本概念。

  • 第一个概念是 Flink Checkpoint。这个是 Flink 定期用来持久化状态,制作快照的一个功能,主要用于容错、两阶段提交等。

  • 第二个概念是 Paimon Snapshot。在 Flink 制作 Checkpoint 的时候 Paimon 会产生 1 个或 2 个Snapshot,这取决于 Paimon 在这个过程中是否有进行过 Compaction,但至少会产生一个 Snapshot 来作为新的数据版本。

  • 第三个概念是 Data Version,也就是数据版本。计算引擎在计算的时候会按照数据的版本进行数据的对齐,然后进行处理,从而实现一个微批模式的处理。

目前,短期内我们是将 Paimon Snapshot 和 Data Version 两个概念进行了对齐,也就是说一个 Paimon Snapshot 就对应数据的一个版本。

11

先简单看一个数据对齐的示例。假设我们有 Job-A 和 Job-B,他们分别基于 Table-A 产出了自己的下游表 Table-B 和 Table-C。当 Job-C 想要对 Table-B 和 Table-C 进行关联查询的时候,它就可以基于一致性的版本去做自己的 QUERY。

比如 Job-A 基于 Table-A 的 Snapshot-20 产出了 Table-B 的 Snapshot-11。Job-B 基于 Table-A 的Snapshot-20产出了 Table-C 的 Snapshot-15。那么 Job-C 的查询就应该基于 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 进行计算,从而实现计算的一致性。

12

接下来介绍一下数据对齐的实现,它的实现分为两个部分。

  • 在提交阶段,需要去血缘关系表中查询上下游表的一致性版本,并且基于查询结果给对应的上游表设置起始的消费位置。
  • 在运行阶段,按照消费的 Snapshot 来协调 Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 发出 Checkpoint 的请求时,会强制要求将 Checkpoint 插入到两个 Snapshot 的数据之间。如果当前的 Snapshot 还没有完全被消费,这个 Checkpoint 的触发会被推迟,从而实现按照 Snapshot 对数据进行划分和处理。

    13

在 Flink 的 Checkpoint 成功之后,它会通知Sink的算子来进行 Table 的 commit。在 commit 完成之后,这份 Snapshot 的数据就可以被下游可见了。此时会由 Commit Listener 将数据的血缘关系写入到 System Table 中,用来记录这份血缘关系。

14

当我们实现上面两个功能之后,具体有哪些应用场景呢?

  • 第一,数据血缘的自动化管理。数据血缘关系在整个数仓中是非常重要的一个部分。基于血缘关系我们可以快速的进行数据溯源,风险评估等。同时也可以基于血缘关系分析这些表的使用方、使用数量、数据走向,从而进行实际应用价值的评估。
  • 第二,查询一致性的能力,我们可以为 OLAP 查询自动按照数据版本来做数据对齐,并且保证查询结果的一致性。同时基于一致性数据进行开发和 debug,可以降低开发和运维成本,不再需要业务方手动进行多表对齐的操作。
  • 第三,数据订正。基于数据一致性管理以及数据血缘关系,可以简化数据订正的过程。首先按照血缘关系我们可以自动的创建下游需要订正的表的镜像表,然后再进行订正。可以提供两种订正方式,全量订正和增量订正。
    • 全量订正,可以基于一致性版本的数据从上游进行全量消费,产生一个全链路的新数据。在整个数据生产追上延迟之后,可以对表进行一个自动切换。
    • 增量订正,可以考虑和 Flink 的 Savepoint 机制相结合,从而不用再从零开始去初始化状态,减少需要回溯的数据量。

三、当前进展

15

下面我们介绍一下目前数据一致性管理的阶段性进展。

在社区里,目前我们发起了相关的 issue、PIP 以及邮件进行讨论,大家感兴趣的话可以关注一下相应的进展。如果有新的需求和想法的话,也欢迎大家一起来交流。

16

在字节内部,目前我们完成了一个 POC 版本的开发和测试。在这个版本中,我们提供了一个第三方的外部服务,用来管理血缘关系,协调数据版本等。

四、未来规划

17

最后介绍一下在 Streaming Warehouse 上的未来规划。

  • 第一,端到端延迟优化。在 POC 的过程中,我们发现端到端的延迟很大程度上取决于 Flink Checkpoint 的间隔,同时在内部收集一些业务需求的时候,业务对端到端延迟要求比较高。这样会带来一个问题,当我们降低 Checkpoint 的频率时,会导致比较多的小文件,这需要做一些权衡。下一阶段我们会着重解决端到端延迟的问题。
  • 第二,数据订正能力增强。目前这个是业务在实时数仓生产中反馈比较多的痛点,业务希望数据订正的成本可以足够低,同时订正过程产生的中间结果对外不可见。
  • 第三,状态复用。在数仓生产中有很多场景是多表关联。目前在 Flink 中,Join 算子会存储左右两条流的数据明细,在多表级联 Join 的场景下,每个 Join 算子都会存储之前的 Join 结果,相当于多存储了一次前面表的明细,会产生非常严重的状态膨胀的问题。业务希望这些状态可以被复用,也就是说相同表的数据只用被存储一份,这样的话可以大幅度的减少状态存储的开销。同时业务也希望这个中间状态是可以被查询的。假设这些状态可以被存储到 Paimon 的表中,采用 Lookup Join 的方式去访问。那么我们就可以使用 Flink 的 SQL 直接查询中间状态。

Q&A

问:血缘关系解析是基于 Flink 的 calcite 吗?

答:不是,是基于 FlinkTableFactory 进行实现,在创建 DynamicTableSource 和 DynamicTableSink 时,提取相关的 Table 信息和任务信息,然后写入到 Paimon 的血缘关系表中。

问:针对于任务出错,数据订正,具体是怎么操作的呢?也就是恢复正常的一个处理流程是怎么样的,大概需要多长时间能够恢复正常呢?

答:我们的目标是希望数据订正的流程可以在系统内自动完成,初期设想是在订正时,基于表的血缘关系对下游表产生相应的镜像表,然后将任务双跑在这条镜像链路上,基于数据血缘关系可以实现数据仍然按照相同的版本进行处理。在两条链路的延迟基本对齐时,进行任务以及表的切换。处理时间依赖处理的数据量,链路的复杂度等。

问:大佬有考虑在此基础上做一个统一的 Paimon 管理服务吗?例如 Paimon 的元数据管理,Compaction 管理,血缘管理等等

答:目前只考虑了实现元数据管理、血缘管理等,对于 Compaction 管理,可能更适合在 Table Service 这样的服务中进行。

问:业务周期跨度比较大,Flink Join 缓存全量的数据?

答:Flink 全量 Join 数据会在状态中存储 Table 的所有数据,同时对于级联 Join 会产生非常严重的状态膨胀问题。根据 Join 的原理,可以考虑将 Join 实现为 Lookup Join + Delta Join,对于历史数据,采用 Lookup join 去查历史表数据,而对于最近的增量数据,将其存储在状态中,通过状态查询进行 Join,这样可以将大量的全量数据存储在 Paimon 表中,状态里只缓存少部分数据。这依赖版本管理的能力来区分数据是 Join 历史数据还是增量数据。

问:字段血缘关系会做吗?要根据 SQL 语法解析的吧

答:暂时不考虑字段血缘关系的实现。

点击查看原文视频 & 演讲PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/834560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Leetcode】二叉树的最近公共祖先,二叉搜索树转换成排好序的双向链表,前序遍历与中序遍历构造二叉树

一.二叉树的最近公共祖先 链接 二叉树的最近公共祖先 题目再现 『Ⅰ』思路一:转换成相交链表问题 观察上图,节点1和节点4的最近公共祖先是3,这是不是很像相交链表的问题,关于相交链表,曾经我在另一篇文章里写到过&a…

WPS Office AI实战:智能表单,信息收集神器

前面我们已经介绍了WPS里常用的文字、表格、演示文稿等等,在WPS AI的武装下重新发挥出智能化的威力,今天来聊聊表单的智能化应用会是什么样。 金山智能表单进行数据轻松收集,通过对话或拍照创建表单,回收结果还能自动生成数据报告…

回归预测 | MATLAB实现SO-CNN-BiGRU蛇群算法优化卷积双向门控循环单元多输入单输出回归预测

回归预测 | MATLAB实现SO-CNN-BiGRU蛇群算法优化卷积双向门控循环单元多输入单输出回归预测 目录 回归预测 | MATLAB实现SO-CNN-BiGRU蛇群算法优化卷积双向门控循环单元多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现SO-CNN-BiGRU蛇群算法…

浅谈管廊智能监控和报警系统设计探究

贾丽丽 安科瑞电气股份有限公司 上海嘉定 201801 摘要:综合地下管廊为我国城市的发展发挥了积极的推动作用,为了确保综合地下管廊基本功能得以真正的发挥出来,有必要将智能监控系统融入综合地下管廊智能管理系统构建中,以便于实…

windows物理机 上安装centos ,ubuntu,等多个操作系统的要点

一、摘要 一般情况下,我们的笔记本或工作电脑都默认安装windows 分几个区,当下是win7 win8 win 10 win11 等,突然我们有需求需要安装个centos ,后面我们应当怎么做,要点是什么?一定要根据网上的贴子一步步来…

【雕爷学编程】MicroPython动手做(37)——驱动LCD与图文显示3

MixPY——让爱(AI)触手可及 MixPY布局 主控芯片:K210(64位双核带硬件FPU和卷积加速器的 RISC-V CPU) 显示屏:LCD_2.8寸 320*240分辨率,支持电阻触摸 摄像头:OV2640,200W像素 扬声器&#…

python绘制六边形风车,用python画简单的风车

本篇文章给大家谈谈用python画简单的风车,以及python绘制六边形风车,希望对各位有所帮助,不要忘了收藏本站喔。 风车的动画,过程如下:1)绘制风车形状A,2)擦除风车形状A,3…

【LinearAlgebra】Chapter 12 - Linear Algebra in Probability Statistics

文章目录 Chapter 12 - Linear Algebra in Probability & StatisticsVariance (around athe mean) 方差(接近均值)Continuous Probability Distributions 连续概率分布Mean and Variance of p ( x ) p(x) p(x) p ( x ) p(x) p(x) 的均值和方差Norm…

idea-常用插件汇总

idea-常用插件汇总 码云插件 这个插件是码云提供的ps-码云是国内的一款类似github的代码托管工具。 Lombok Lombok是一个通用Java类库,能自动插入编辑器并构建工具,简化Java开发。通过添加注解的方式,不需要为类编写getter或setter等方法…

[MYSQL]查询单位时间消耗量

请求哪位大神给我优化一下mysql语句. 数据库表:(日/月/年 数据表和data_hour结构一样,懒得复制了,不然太长) DROP TABLE IF EXISTS data_source;#数据源 create table IF not EXISTS data_source ( num int unsigned not null auto_increment PRIMARY KEY COMMENT序号 …

关于win11 debian wsl 子系统安装启动docker一直starting,无法启动

首先我先说明,我的步骤都是按照官网步骤来的 通过官网的操作步骤 通过测试命令 sudo docker run hello-world得到下面的命令,我们通过启动命令 sudo service docker start 执行结果如下图 也就是说无法启动,一直显示在启动中 遇到这种情况…

Ubuntu安装git

使用 apt-get install git 安装git 报错: 这个错误信息通常表示您的系统上没有可用的 git 软件包。这可能是因为您的软件源列表中没有包含 git 软件包所在的软件源,或者您的软件源列表已经过期。 解决: 如果您使用的是 Ubuntu 或类似…

Vue2与Vue3响应式原理

Vue2的响应式 Vue3的响应式

(杭电多校)2023“钉耙编程”中国大学生算法设计超级联赛(5)

1001 Typhoon 计算几何 对于每一个避难点,计算其到所有线段的距离,取min即可 AC代码&#xff1a; #include<iostream> #include<algorithm> #include<cstring> #include<vector> #include<deque> #include<cmath> #include<cstdio&…

【云原生】K8S二进制搭建二:部署CNI网络组件

目录 一、K8S提供三大接口1.1容器运行时接口CRI1.2云原生网络接口CNI1.3云原生存储接口CSI 二、Flannel网络插件2.1K8S中Pod网络通信2.2Overlay Network2.3VXLAN2.4Flannel 三、Flannel udp 模式的工作原理3.1ETCD 之 Flannel 提供说明 四、vxlan 模式4.1Flannel vxlan 模式的工…

Packet Tracer - 配置初始路由器设置

Packet Tracer - 配置初始路由器设置 目标 第 1 部分&#xff1a;检验默认路由器配置 第 2 部分&#xff1a;配置并检验初始路由器配置 第 3 部分&#xff1a;保存运行配置文件 拓扑图 背景信息 在本练习中&#xff0c;您将执行基本的路由器配置。您将使用加密密码和明文…

HDFS架构刨析

HDFS架构刨析 概述HDFS架构图整体概述主角色&#xff1a;namenodefsimage内存元数据镜像文件edits log&#xff08;Journal&#xff09;编辑日志 从角色&#xff1a;datanode主角色辅助角色&#xff1a;secondarynamenode 重要特性主从架构分块存储机制副本机制namespace元数据…

京津冀特大暴雨,带给应急通信工作怎样的启示?

上个月&#xff0c;我发卫星基站中移集采那篇文章的时候&#xff0c;就提到&#xff1a;未来即将进入汛期&#xff0c;应急通信装备将发挥重要作用。 果不其然&#xff0c;没多久&#xff0c;超强台风“杜苏芮”形成并登陆福建&#xff0c;造成巨大破坏。 除了在南方地区的肆虐…

Java工程师研学之路【003Java基础语法下】

知识体系 挑战&#xff08;challenge&#xff09; 从终端输入字符串(输入的个数不超过10个)&#xff0c;当输入遇到end字符串时&#xff0c;结束输入并且打印出之前输入的所有字符串。 思路&#xff1a;首先要输出end之前的所有字符串&#xff0c;故可以使用数组把输入的字符…

zookeeper+kafka分布式消息队列集群的部署

目录 一、zookeeper 1.Zookeeper 定义 2.Zookeeper 工作机制 3.Zookeeper 特点 4.Zookeeper 数据结构 5.Zookeeper 应用场景 &#xff08;1&#xff09;统一命名服务 &#xff08;2&#xff09;统一配置管理 &#xff08;3&#xff09;统一集群管理 &#xff08;4&…