基于 Flink 构建实时数据湖的实践

news2024/11/25 0:51:35

本文整理自火山引擎云原生计算研发工程师王正和闵中元在本次 CommunityOverCode Asia 2023 数据湖专场中的《基于 Flink 构建实时数据湖的实践》主题演讲。

实时数据湖是现代数据架构的核心组成部分,随着数据湖技术的发展,用户对其也有了更高的需求:需要从多种数据源中导入数据、数据湖与数据源保持实时与一致、在发生变更时能够及时同步,同时也需要高性能查询,秒级返回数据等。所以我们选择使用 Flink 进行出入湖以及 OLAP 查询。Flink 的批流一体架构、Exactly Once 保证和完善的社区生态提供了众多 Connector 可以满足前面的需求。Flink 也同样适合 OLAP 查询,这一点将在本文进行详细介绍。

整体架构

在基于 Flink 构建实时数据湖的整体架构中,底层使用 K8s 作为容器编排和管理平台。存储层支持 HDFS 或 S3。由于 Iceberg 良好的文件组织架构和生态,所以选择了 Iceberg 作为 Table Format。计算层则使用 Flink 进行出入湖,其中 Flink SQL 是最常用的出入湖方式,同时也用 Flink Datastream API 开发了一些高阶功能,出入湖的作业使用 Flink Application Mode 运行在 K8s 上。然后通过 Flink SQL Gateway 和 Session Mode 的 Flink Cluster 进行 OLAP 查询,提供了 JDBC 和 REST API 两种接口的返回结果。当然我们也需要使用 Catalog 管理元数据,这里不仅仅指 Iceberg 的元数据,还包括了其他第三方数据源的元数据,并利用定时任务进行后续的数据维护。

数据入湖实践

在数据入湖时 Flink 从左边的数据源获取数据,通过流或批的方式写入到 Iceberg 中。Iceberg 本身也提供了几种 Action 进行数据维护,所以针对每张表都会有数据过期、快照过期、孤儿文件清理、小文件的合并等定时调度任务,这些 Action 在实践过程中对性能的提升有很大帮助。

针对 Schema 固定,目的表也存在表到目的表的情形,通常使用 Flink SQL 进行数据导入和导出、可以写临时表,也可以把元数据存储到 Catalog 中,使用 Catalog Table 进行数据导入导入。但是为了满足客户更复杂的需求,在实践过程中我们基于 Datastream API 开发了 CDC Schema 自动变更,可以实现整库同步+自动建表的功能。

Flink SQL

Iceberg 社区支持了基本的写入和读取功能。Flink 1.17 引入了行级更新和删除的功能(FLIP-282),我们在此基础上增加了批量 Upate 和 Delete 操作,通过 RowLevelModificationScanContext 接口实现 Iceberg 的行级更新。实践过程中,通过在 Context 中记录了两个信息——事务开始时的 Snapshot ID,以及 UPDATE/DELETE 的过滤条件,用于保证批式 Update 和 Delete 的事务性。

Schema Evolution

Schema 演进是流处理中一个常见的问题,即通过在流作业过程中动态变更目的端的 Schema 保证数据的正确写入。Iceberg 本身对 Schema 变更有很好的支持。在 Iceberg 的存储架构中:Catalog 是不存储 Schema 的,只存储最新的 Metadata 文件位置。 Metadata文件存储着所有 Schema id 到 Schema 信息的映射,以及最新的 Schema id——Current-Schema-id。底下的每个 Manifest 记录一个 Schema id,代表 Manifest 底下的 Parquet 文件用的都是对应的 Schema。

如果 Iceberg 发生了 Schema 变更,Metadata 文件会记录新的 Schema,并把 Current-Schema-id 指向新的 Schema。后续启动的写入作业就会按照新的 Schema 去生成新的 Parquet 数据文件和对应的 Manifest 文件。读取时会根据最新的 Schema-id 对应读取,即使底层存在不同 Schema 的 Manifest 文件也会使用新的 Schema 信息进行读取。

目前 Iceberg 提供的 Flinksink 并不支持 Schema 变更,Iceberg 默认的 Flinksink 会给每一个需要写入的 Parquet 文件创建一个 Streamwrtier,而这个 Streamwriter 的 Schema 是固定的,否则 Parquet 文件的写入就会报错。上图示例中原始 Schema 是 id、name、age,在 Schema 匹配情况下的写入不会报错,所以 Row 1 可以写入;Row 2 写入时由于长度不符合,所以会报错:Index out of range;Row 3 写入时,由于数据类型不匹配,会报错:Class cast excetpion;Row 4 写入时虽然类型和长度都匹配,但 Schema 含义不同,最终会在结果文件中写入一条脏数据。

针对 Schema 变更要解决的问题主要有两个:1)怎么知道每条 Row 对应哪个 Schema?2)怎么在一个作业里写多种 Schema 数据?

针对第一个问题,在 Flink CDC Connector 中可以为每条记录设置包含 Schema 信息。所以我们需要实现一个反序列化方法,输出一条记录,包含 Row 和它对应的 Schema 信息,也就是图中紫色的部分,由此就解决了第一个问题。

针对第二个问题,支持多种 Schema 混写,需要为不同的 Schema 创建不同的 Streamwriter,每个 Streamwriter 对应一种 Schema。由此在 Iceberg Sink Connector 增加了新的 FlinkSchemaEvolvingSink,它会判断进入的数据是否和当前的 Schema 匹配,如果不匹配,就会向 Iceberg Commit 新的 Schema信息,返回 Schema id 后再按新 Schema 写数据、Commit 数据,即上图中对蓝色线条的描述,如果 Schema 是已经生成过的,就返回旧的 Schema id。FlinkSchemaEvolvingSink 中维护一个 Streamwriter 的 Map,其中 Key 是 Schema ID,当 Schema 传递过来之后会判断是否含有该 Schema的 Writer,如果没有就会去创建一个 Writer,这样就可以满足在同一个作业中写入多种 Schema 信息。

整库同步和自动建表

在 Flink 任务 Jobgraph 生成之前,需要一个 Catalog 模块读取源表的信息,同步在 Iceberg 端创建或者变更对应的目的表,同时在 Jobgraph 中增加对应表的 Sink 信息

在 Flink 作业运行过程中,每条 Binlog 记录会通过一个反序列化解析器生成一条记录,这条记录包含了 Tableid 和 Row 两部分内容,即图上紫色部分的记录。随后对这条记录进行 Split,将 Row 按照 Table id 拆开后再经过 Keyby Partition 操作后写入到下游表中。

整个流程主要由以下四部分组成:

  1. 反序列化器会解析 Event 事件和数据。为了防止在流转过程中 Class Cast Exception,数据类型需要保持和源 Schema 保持相同,这个就需要对每种类型做测试,通过使用 Flink CDC 里面的测试用例对每种类型进行比对。

  2. Catalog Module 主要负责自动建表和更新表内容,并需要和反序列化器保持一致的类型转换方式。

  3. Table Spilt 能够实现 Source 复用的功能,给每张表创建一个 Sideoutput Tag,并输出到下游。

  4. 因为 Iceberg Sink 会对每个 Partition 创建对应的 Fanout Writer,占用的内存很大。所以我们需要对表的 Partition 字段进行 Keyby 操作,用来减少 OOM 次数。因为 Iceberg 有隐式分区的特性,所以需要对隐式分区的字段 Transform 之后再进行 Keyby 操作。

数据查询实践

为什么选择 Flink

  • 在架构上,Flink 支持 JDBC 驱动程序、SQL-Gateway 和会话模式。Flink 会话集群是一个典型的 MPP (大规模并行处理)架构,每个查询不需要申请新的资源。用户可以通过 JDBC 驱动程序轻松提交 SELECT 语句,并在秒级甚至亚秒级取回结果。

  • 强大的批处理能力。Flink OLAP 可以采取许多批处理操作和优化。同时,OLAP 中也存在大量查询,Flink 可以根据 Flink 的批处理的能力支持它们,而不需要像其他 OLAP 引擎那样引入外部批处理引擎。

  • Flink 支持 QUERY/INSERT/UPDATE 等标准 SQL 语法,满足 OLAP 用户的交互需求。

  • 强大的连接器生态系统。Flink 为输入和输出定义了全面的接口,并实现了许多嵌入式连接器,如数据库、数据湖仓库。用户也可以基于这些接口轻松实现定制的连接器。

OLAP 架构

Flink OLAP 的整体架构,分为 Flink SQL Gateway 和 Flink Session Cluster 两部分。首先,用户使用 Client 通过 Rest 接口提交一个 Query,先经过 Gateway 的 SQL 解析和优化过程,生成作业的执行计划后通过高效的 Socket 接口提交给 Flink Session Cluster 上的 JobManager 到对应的 TaskManager 上,执行后将结果返回给 Clienht。JobManager 上的 Dispatcher 会创建一个对应的 JobMaster,之后 JobMaster 根据集群内的 TaskManager 按照一定的调度规则进行 Task 部署。

优化措施

Query 生成优化

  • Plan 缓存

第一个优化点是 Plan 缓存。在 OLAP 场景下,Query 有两个典型的特点:一个是业务上有很多重复的 Query,这一点和流式是不一样的,第二个特点是查询耗时的要求是秒级,我们通过分析发现,Plan 阶段的耗时有几十到几百毫秒,占比是比较高的。因此通过支持 Plan 缓存,对 Query 的 Plan 结果 Transformations 进行缓存,避免相同 Query 的重复 Plan 问题。

此外,也支持了 Catalog Cache 加速元信息的访问,以及 ExecNode 的并行 Translate,使 TPC-DS Plan 的耗时降低了 10% 左右。

  • 算子下推

第二个优化是算子下推,在存算分离架构下算子下推是一类非常重要的优化,它的核心思路是通过尽可能的将一些算子下推到存储层计算来大幅减少 Scan 的数据量,降低外部的 IO,同时也减少了 Flink 引擎需要处理的数据量,从而明显提升了 Query 的性能。

在字节内部的实践中,有一个典型的业务的大部分 Query 都取用 TopN 数据,因此我们支持了 TopN 的下推,从图中可以看出,把 Local 的 SortLimit 算子,也就是 Local 的 TopN 算子下推到了 Scan 节点,最终在存储层做 TopN 计算,以此大大降低了从存储读取的数据量。优化的效果非常明显,Scan 节点从存储读取的数据量降低了 99.9%,业务 Query 的 Latency 降低了 90.4% 左右。

除此之外,我们还支持了更多的算子下推,包括 Aggregate 下推,Filter 下推和 Limit 下推等。

Query 执行优化

  • ClassLoader 复用

在 ClassLoader 复用中首先来分析一个在 OLAP 下频繁创建 Classloader 引发 CPU 占用过高的问题。我们发现 JM/TM 的 CPU 占用在线上很高。通过火焰图分析,JVM 的 Dictionary::find 方法占据了 70% 以上的 CPU,在进一步分析 JVM 源码时发现 JVM 在加载了 Class 之后,为了加速从 Class Name 到 Classloader 的查找,会维护一个叫做 SystemDictionary 的哈希表(Key 是 Class Name,Value 是 Classloader 实例)。在 Classloader 数量非常多的时候,比如线上出现了超过 2w 的 Classloader 的情况下,哈希表中会存在大量的冲突,使查找的过程非常缓慢,即整个JM 大部分的 CPU 都会消耗在这个步骤中。

通过定位发现,这些 Classloader 都是 UserCodeClassloader,是用于动态加载用户的 Jar 包的,每个 Job 都会创建新的 UserCodeClassloader,从右下图中可以看出,新 Job 的 JobMaster 和 TM 上该 Job 的 Task 都会创建新的 UserCodeClassloader,导致 JM 和 TM 上的 Classloader 过多。除此之外,Classloader 过多还会导致 JVM Metaspace 空间不足,进而频繁的触发 Metaspace Full GC。

因此我们做了 Classloader 复用的优化,主要分为两步,首先优化了依赖 Jar 的方式,由于 OLAP 场景下依赖的第三方 Jar 包是相对固定的,可以直接放在 JM 和 TM 启动的 Classpath 下,并不需要每个作业单独的提交 Jar 包。接着对于每个作业在 JobMaster 和 Task 初始化时直接复用 System Classloader。经过 Classloader 复用之后,JM 中 Dictionary::find 所占的 CPU 使用从 76% 下降到 1%,同时,Metaspace Full GC 的频率显著降低。

  • CodeGen 缓存优化

这个优化的前提是我们发现了在 OLAP 下 Codegen 源代码编译占据 TM CPU 过多的问题,在当前的 Codegen 缓存流程中,Flink SQL 中大量算子使用了 Codegen 生成计算逻辑,比如 Codegen Operator 中的 Generated Class,其中的 Code 就是 Codegen 生成的 Java 源代码,在 Operator 初始化时,需要编译 Java 源代码并加载为 Class。为了避免重复的编译,当前已经有了缓存的机制,会根据 Class Name 映射到 Task 所用的 Classloader,再映射到编译好的 Class。

但是在当前的缓存机制下,存在两个问题,首先是当前的机制只实现了同一个作业内部,同一个 Task 的不同并发的复用,但是对于同一个 Query 的多次执行依然存在重复编译,这是因为 Codegen 生成 Java 源代码时为了避免命名冲突,代码的类名和变量名的后缀采用了进程级别的自增 ID,导致了同一 Query 的多次执行,类名和代码的内容都会发生变化,因此无法命中缓存。另外一个问题是,每次编译和加载 class 都会创建一个新的ByteArrayClassloader,频繁创建 Classloader 会导致 Metaspace 碎片严重,并引发 Metaspace Full GC,造成服务的抖动。

为了避免跨作业代码的重复编译,实现跨作业的 Class 共享,我们需要优化缓存的逻辑,实现相同源代码到编译好的 Class 的映射。这里有两个难点:

  1. 首先是如何保证相同逻辑的算子所生成的代码相同;

  2. 如何设计 Cache Key 唯一识别相同的代码。

对于第一个难点,我们在 Codegen 代码生成的时候,把类名和变量名中的自增 ID 从全局粒度替换为 Local Context 粒度,使得相同逻辑的算子能生成相同的代码。对于第二个难点,我们设计了基于 Classloader 的 Hash 值 + Class Name + 代码的长度 + 代码的 md5 值的四元组作为 Cache Key 来唯一识别相同的代码。

Codegen 缓存优化的效果是非常明显的,TM 侧代码编译的 CPU 使用率从之前的 46% 优化到 0.3% 左右,Query 的 E2E Latency 降低了 29.2% 左右,同时 Metaspace Full GC 的时间也降低了 71.5% 左右。

物化视图

  1. 首先用户通过 Flink SQL 向平台发送创建物化视图的请求;

  2. 平台负责创建 Iceberg 物化视图、启动 Flink 作业刷新物化视图,并且托管这个作业来保证它持续运行。

  3. Flink 刷新作业就会持续从源表流读增量数据,做增量计算得到增量的结果后流式写到物化视图。

  4. 最终用户就能直接通过查物化视图拿到原本需要做全量计算才能获得的结果。

以上就是实现物化视图主要的流程,目前我们的 Iceberg 物化视图还只是一个普通的 Iceberg 表,未来会在 Iceberg 层面记录更完善的元数据,用来支持判断数据的新鲜程度,也会基于已有的物化视图自动重写和优化用户的查询。其中定期数据维护会包括:过期数据清理、过期快照清理、孤儿文件清理、数据/元数据小文件合并等。

总结和展望

后续工作的重点将主要围绕自动化创建物化视图、物化视图的查询重写、自动调优数据维护任务的参数(包括执行频率、合并文件大小等),以及数据冷热分层/Data cache的相关工作展开。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1319851.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql高可用|索引|事务 | 调优

前言 「作者主页」:雪碧有白泡泡 「个人网站」:雪碧的个人网站 文章目录 前言sql语句的执行顺序关键词连接名字解释sql语句 面试坑点存储引擎MYSQL存储引擎 SQL优化索引索引失效索引的数据结构面试坑点 锁事务四大特性事务的隔离级别MVCC 读写分离面试坑…

以低成本实现高转化:搭建年入百万的知识付费网站的技巧与方法

明理信息科技知识付费平台 一、引言 随着知识经济的崛起,越来越多的知识提供者希望搭建自己的知识付费平台。然而,对于新手来说,如何以低成本、高效率地实现这一目标,同时满足自身需求并提高客户转化率,是一大挑战。…

POST:http://XXX:XXXX/XXXX/XXXX(404 Not found)离谱

很离谱,同样的请求方式,不同的接口会有404的问题。看下边: 上边接口访问正常,下边接口出现404.且本地测试也可以,代码也推到公司git上了。真的很离谱。 我也不知道怎么回事,无语||||||| 哪位兄弟知道啊&a…

4.配置系统时钟思路及方法

前言: 比起之前用过的三星的猎户座4412芯片,STM32F4的系统时钟可以说是小巫见大巫,首先我们需要清晰时钟产生的原理:几乎大多数的芯片都是由晶振产生一个比较低频的频率,然后通过若干个PLL得到单片机能承受的频率&…

2023_Spark_实验二十八:Flume部署及配置

实验目的:熟悉掌握Flume部署及配置 实验方法:通过在集群中部署Flume,掌握Flume配置 实验步骤: 一、Flume简介 Flume是一种分布式的、可靠的和可用的服务,用于有效地收集、聚合和移动大量日志数据。它有一个简单灵活…

LibreNMS:从docker出发

引言 LibreNMS 是一个免费开源的网络监控和自动化工具,用于监视网络设备、服务器和应用程序的性能和状态。它提供了一个集中的管理平台,帮助管理员实时监控和管理整个网络基础设施。 以下是 LibreNMS 的一些主要特点和功能: 自动发现&#…

20、清华、杭州医学院等提出:DA-TransUNet,超越TranUNet,深度医学图像分割框架的[皇帝的新装]

前言: 本文由清华电子工程学院、杭州医学院、大阪大学免疫学前沿研究所、日本科学技术高等研究院信息科学学院、东京法政大学计算机与信息科学专业共同作者,于2023年11月14号发表于arXiv的《Electrical Engineering and Systems Science》期刊。 论文&…

【Python基础】生成器

文章目录 [toc]什么是生成器生成器示例生成器工作流程生成器表达式send()方法和close方法send()方法close()方法 什么是生成器 在Python中,使用生成器可以很方便地支持迭代器协议生成器通过生成器函数产生,通过def定义,但不是通过return返回…

酷雷曼再获“国家高新技术企业”认定

2023年12月8日,《对湖北省认定机构2023年认定报备的第五批高新技术企业拟进行备案的公示》正式发布,酷雷曼武汉同创蓝天科技有限公司成功获评“国家高新技术企业”认定。 屡获权威认定,见证硬核实力 被评定为高新技术企业是我国企业最高荣誉…

武汉小程序开发全攻略:从创意到上线,10个必备步骤详解

在当前数字化时代,小程序已经成为企业营销和服务的重要工具。特别是在武汉这样的创新型城市,小程序开发更是备受青睐。本文将为您详细解读武汉小程序开发的全攻略,从创意到上线的10个必备步骤。 步骤一:确定小程序类型和功能定位…

DSP捕获输入简单笔记

之前使用stm32的大概原理是: 输入引脚输入一个脉冲,捕获1开始极性捕获,捕获的是从启动捕获功能开始计数,捕获的是当前的计数值; 例如一个脉冲,捕获1捕获上升沿,捕获2捕获下降沿;而两…

mysql自动安装脚本(快速部署mysql)

mysql_install - 适用于生产环境单实例快速部署 MySQL8.0 自动安装脚本 mysql8_install.sh(执行前修改一下脚本里的配置参数,改成你自己的)(博客末尾) my_test.cnf(博客末尾)(这个…

Linux性能优化常做的一些事情

Linux性能优化是一个广泛的主题,涉及多个方面。以下是一些常见的Linux性能优化建议: 硬件和系统配置: 使用SSD替代HDD。确保系统有足够的RAM。使用多核CPU。配置合适的网络硬件和带宽。 磁盘I/O性能: 使用RAID来提高I/O性能。使用…

WordCloud—— 词云

【说明】文章内容来自《机器学习入门——基于sklearn》,用于学习记录。若有争议联系删除。 wordcloud 是python的第三方库,称为词云,也成文字云,可以根据文本中的词频以直观和艺术化的形式展示文本中词语的重要性。 依赖于pillow …

物联网对接使用蓝牙还是WiFi,应该如何选择?

蓝牙是一种无线技术协议,可促进连接设备之间短距离的数据交换。它依赖于物理邻近性并使用2.400至2.485 GHz之间的UHF(超高频)无线电波。蓝牙旨在创建个人区域网络(PAN)并在笔记本电脑、智能手机和外围设备等计算设备之…

虚幻学习笔记18—C++委托(多播)和事件

一、前言 委托分单播和多播,多播就是可以绑定多个回调函数,然后一次性执行。这样也可以理解为啥多播没有返回值,多个回调函数执行后返回哪一个都是问题啊。而事件呢官方官方文档说法是“对于事件而言,只有定义事件的类才能调用 Br…

055:vue工具 --- 人民币小写转化为大写

第055个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下,本专栏提供行之有效的源代码示例和信息点介绍,做到灵活运用。 (1)提供vue2的一些基本操作:安装、引用,模板使…

2023年国赛高教杯数学建模D题圈养湖羊的空间利用率解题全过程文档及程序

2023年国赛高教杯数学建模 D题 圈养湖羊的空间利用率 原题再现 规模化的圈养养殖场通常根据牲畜的性别和生长阶段分群饲养,适应不同种类、不同阶段的牲畜对空间的不同要求,以保障牲畜安全和健康;与此同时,也要尽量减少空间闲置所…

读取小数部分

1.题目描述 2.题目分析 //假设字符串为 char arr[] "123.4500"; 1. 找到小数点位置和末尾位置 代码如下: char* start strchr(arr, .);//找到小数点位置char* end start strlen(start) - 1;//找到末尾位置 如果有不知道strchr()用法的同学&#xf…

Html+单页面引入element以及Vue框架引用地址报错(unpkg.com国内无法访问可代替方案)

问题 单页面引入element以及vue 地址报错,请求超时 使用的引用地址是官网上提供,但是没解决问题 一、原因: unpkg也没有幸免于难,也被墙了,unpkg上的相关资源都不能访问,才导致项目资源加载不出。 二、…