Flink CDC 在易车的应用实践

news2024/11/21 1:29:41

摘要:本文整理自易车数据平台负责人王林红,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分:

  1. Flink 应用场景
  2. DTS 平台建设
  3. Flink CDC + Hudi 应用实践
  4. 未来规划

点击查看直播回放和演讲 PPT

一、Flink 应用场景

Flink 在易车有丰富的应用场景,主要包含实时数仓建设和数据集成。

对于实时数仓建设,主要是数仓实时指标的开发,将离线指标逐步向实时指标过度;同时承接了公司各种实时大屏需求,并多次支持了公司内部的 818 购车节活动。

在实时监控方面,首先是日志监控,主要用来监控埋点情况,另外对于服务器日志,我们也进行统一收集和监控,监控服务响应和异常日志等,同时结合机器学习算法,做日志的聚类;在前端层面,监控前端接口超时、白屏等异常情况。最后也应用在一些业务实时监控、风控等场景。

在数据集成方面,使用 Flink 完成关系型数据库的实时接入,将 MySQL、SQL Server 等的数据实时接入到 Kafka 中;同时将 Kafka 的数据实时同步到 HDFS/Hive 中,实现数据的实时入仓、入湖;对于数据传输通道,我们使用 Flink 将 Kafka 的数据同步到下游存储引擎中,比如 TiDB、MySQL、ClickHouse、HDFS、Doris 等存储中,也实现一些异构数据源数据的实时同步。

二、DTS 平台建设

易车的数据集成主要分为两条线,一条离线数据集成,另一条是实时数据集成。本次主要介绍的是实时数据集成的演进过程,对于实时数据集成,最开始也是处在离线阶段,随着业务的发展,对数据的时效性越来越高,开始使用 Canal 同步 MySQL 的数据,然后使用 Spark 做微批计算,再之后引入 Flink,还是使用 Canal 同步接入 MySQL 数据,使用 Flink 进行数据的实时计算,再之后引入了 Flink CDC,基于 Flink CDC 做全增量一体化实时计算。

在使用 Canal+Flink 的早期阶段,整体流程如下,对于 MySQL 的数据,使用 Canal 通过解析 Binlog 的方式将数据同步到 Kafka 中,对于 SQL Server 的数据,通过 CDC 的方式同步到 Kafka 中,然后通过 Flink 进行加工计算,同步到下游系统如 HDFS 或 Kafka 中,在这个阶段,基本可以满足业务需求,也可以快速完成数据接入及后续开发。

但是也存在一些痛点问题,主要是,整个数据链路比较长,依赖的组件多,运维成本也比较高,另外 Canal 不支持全量数据的同步,全量和增量是割裂的两个阶段,并且对于不同数据源的接入,需要考虑不同的实时接入方案,维护也比较困难。

基于以上痛点问题,和我们的历史经验总结和评估,我们对数据集成工具提出了新的诉求。

  • 希望可以分布式地去支撑大数据场景,工具能够线性扩展,可以方便的对接更多数据源。
  • 希望用一个框架支撑流批一体的传输。
  • 希望基于一个开源框架来开发,这个框架需要和 Hadoop 的整个生态有比较好地集成。并且我们的终极目标,是用一套统一的技术架构来覆盖离线和实时的所有数据集成场景。

基于以上诉求,我们把方案锁定在 Flink 技术栈中,决定基于 Flink CDC 自研实现流批一体的数据集成服务。为什么选择 Flink CDC?

  • Flink CDC 引入了无锁算法,读取阶段全程无锁,降低了因加锁而带来的对线上数据库的影响风险,同时降低了对数据库的压力。支持并发读取,在全量数据同步阶段,可以更快地完成海量数据同步,可以通过水平扩展节点数或增加并行度的方式来加快数据处理速度、加速海量数据的处理。

    支持断点续传,全量阶段支持 Checkpoint,即使任务因某种原因退出了,也可通过保存的 Checkpoint 对任务进行恢复实现数据的断点续传。比如同步数据需要 1 天时间,但是同步任务运行 12 小时后失败了,不需要重跑整个数据同步任务,只需要从发生错误的位置重跑即可。

  • 支持丰富的数据源,目前支持 MySQL、SQL Server、Oracle、TiDB、MongoDB 等,也方便的实现异构数据源的数据同步和数据融合。
  • 端到端的一致性,支持 Exactly-Once 语义,保证全链路数据的准确性。
  • 无缝对接 Flink 生态,复用 Flink 众多 Sink 能力。

Flink CDC 支持了丰富的数据源,源头支持 MySQL、Mongo、TiDB、Oracle、SQL Server 等,目标端支持 kakfa、Hudi、TiDB、Hive、Doris、ClickHouse 等。

同时 Flink CDC 作为新一代的数据集成框架,不仅可以替代传统的 DataX 和 Canal 做实时数据同步,将数据库的全量和增量数据一体化的同步到消息队列或下游系统中;也可以用于实时数据集成,将数据库数据实时入湖入仓;同时还支持强大的数据加工能力,可以通过 SQL API 或 DataStrean API 对数据库数据进行实时关联、打宽、聚合等。

在 2.0 版本中,Flink CDC 对于 MySQL CDC 支持了无锁读取、并发读取、全程断点续传等高级功能,实现 MySQL 数据的增量快照读取,在最新的 2.2 版本中,将增量快照读取算法抽象成了公共框架,也方便其他 Connector 的接入,其他 Connector 只需要接入这个框架就可以提供无锁算法,并发读取和断点续传的能力,十分方便其他连接器的扩展。

所以我们基于 Flink CDC 构建了 DTS 数据传输平台,在源端,目前已经集成支持了 MySQL、SQL Server、TiDB、Mongo、kafka 等数据源,在目标端,也集成了 Hudi、kafka、Doris、ClickHouse、HDFS、Hive 等数据源,方便业务进行数据实时入湖入仓,和异构数据源的传输、同步。

但是,在 DTS 平台建设过程中,我们也遇到了一些问题,比如元信息的字段映射,如何方便安全的将源库的字段类型映射成 Flink 的字段类型;在任务运行过程中,如何动态的增加新的同步表,包括如果业务源库字段变更了,下游系统如何处理?另外随着任务的增多,如何更好的对数据源信息进行维护,如一个业务库迁移,如何优雅的对任务中的数据源信息进行变更?

首先说元信息自动映射的问题,Flink CDC 支持丰富的数据源,这些数据源都需通过手工的方式映射成 Flink 的 DDL。手工映射表结构是比较繁琐的,尤其是数据源头多、映射关系比较复杂,每种数据源都有自己的映射关系,当表和字段数非常多的时候,手工映射也非常容易出错,对用户不友好,开发效率也不高。

为了解决上述问题,我们开发了统一数据源服务,我们将平台中使用到的数据源统一注册到数据源系统中,实现数据源的统一维护管理,同时实现表结构变更通知,影响分析等。

用户在实时计算平台创建表和创建同步任务时,选择对应的数据源,自动获取表的 Schema,通过模版化的方式创建表和数据同步任务,同时使用数据源 ID 对用户屏蔽连接串和账号密码信息,提升账号安全性。

另外数据源信息与任务信息关联,数据源变更或迁移,只需要修改数据源信息,降低修改成本。最后离线层的数据接入也依赖于统一数据源,离线和实时使用同一套元数据,便于流和批模型的统一。

上图是数据源改造前后的一个对比图,前面是原生的 MySQL 的流表,需要连接串、账号、密码信息,统一数据源之后,在链接串中只需要关注数据源 ID。

同时我们对 Connector 进行了改造,任务在执行时,会将具体的数据源 ID 替换为真实的链接串、账号和密码,对于 Kafka 流表也一样,在表创建时,只需要数据源 ID,任务执行时,会替换为 Kafka 的 Server 地址,对于 groupID,在任务中,通过 set 的方式进行设置。这种方式也方便我们进行后续 Kafka 集群的主备切换。对于其他的数据源,比如 Hbase、HDFS 等也做了类似的改造。

上图是我们自动建表的页面,选择对应的数据源,需要映射的源表,通过数据源服务自动获取源表的 Schema,自动做字段类型映射,通过模版化的方式,一键生成 Flink 的建表语句。

上图是数据同步的配置界面,用户主要选择对应的源和目标数据源、数据表,自动做字段映射,如果目标表不存在,会自动创建目标表。

得益于 MySQL CDC 动态加表功能,也可以在已有任务中,直接增加需要同步的表,添加的表会自动先同步该表的全量数据,然后再无缝切换到同步增量数据。遇到新增监控表时不用新起作业。同时也支持通过正则的方式配置分库分表的同步,另外对于源表字段变更,类型变更等,也做了一些适配。

接下来介绍下平台的整体架构,我们对 DTS 平台、调度平台、实时平台进行了深度的整合和集成,任务调度层集成到统一调度平台中,实现任务的统一管理,主要包含任务的运维管理、权限管理、资源管控、监控告警、和变量管理等。

对于实时平台,主要关注任务的开发、运维、和治理。

  • 对于任务开发,提供 WebIDE 给用户进行任务的开发调试,同时提供语法智能校验、检测功能,便于用户发现代码语法问题。
  • 对于任务运维,提供任务诊断、评分、健康检查、日志收集、作业快照(Checkpoint、Savapoint)、自动拉起、批量重启等功能,同时支持任务的容灾恢复。
  • 对于任务治理,主要包含全链路的任务血缘和表血缘,方便的了解任务和表的血缘关系和对任务进行影响分析,还有数据层面质量监控,包括断流、数据量异常波动、数据比对等。

实时计算平台上主要支持 SQL 任务、Jar 包任务和 DTS 任务。对于 SQL 任务和 Jar 包任务,提供版本管理、资源管理:资源管理主要是将表、UDF、Connector 等资源统一管理,并通过模版化的方式和配置化的方式完成 Source、Sink 表的创建,降低用户开发成本;对于 TDS 任务,提供数据源管理、任务配置和数据校验等一些模块和服务。

通过服务平台化,打造一站式的任务开发管理平台,在平台上完成任务从开发到测试、发布、监控的全流程处理操作,降低用户使用平台的门槛。

对于核心的 DTS 数据传输架构如上,整个架构主要是基于 Flink 1.14 的 DataStream API 和 Flink CDC 2.2 构建,覆盖流批的场景,实现各种同步需求。

整个架构主要包含 Source 端、数据传输层、Sink 端, Source 和 Sink 端抽象出 SourceFactory 和 SinkFactory,方便实现对接各种类型数据源,在数据传输层,提供统一的基础服务框架,支持类型转换、自定义监控指标、数据校验等功能,如类型转换,DTS 中也支持了对 Canal 数据格式的适配。

目前 DTS 支持了公司内大部分数据传输管道,涵盖数据库,如 MySQL、SQL Server 和 TiDB 等;消息队列,如 Kafka、RocketMQ 等;以及大数据生态系统的各种组件,例如 HDFS、Hive、ClickHouse、Doris 等,覆盖了易车大部分实时流场景和少数离线场景。

这套数据集成架构如今在易车内部已稳定运行近一年时间,服务于众多产品线,整套架构对数据集成,有很大的收益。

  • 统一了技术栈,通过 Flink 可以完成数据异构数据源的实时集成,同时支持流批一体。
  • 通过平台化的操作,降低了数据接入、任务运维等的复杂度,也无需额外部署 Canal 等组件,降低运维成本,链路稳定性也得到了提升。
  • Flink CDC 全增量一体化的框架,解决了在数据集成方面全量、增量隔裂的痛点问题,实现了全增量一体化的数据集成。

三、Flink CDC + Hudi 应用实践

Flink CDC 的一个主要应用场景就是数据实时入湖,对于数据湖我们主要使用的 Hudi,Hudi 的主要特点如下:

  • Hudi 的 upsert 功能支持的比较成熟。
  • Hudi 的表文件可以存储在 HDFS 上,兼容 Hadoop 生态圈,可以使用 Hive、Spark、Presto 等引擎查询 Hudi 表。
  • Hudi 表的组织模式也很灵活,可以根据不同场景选择不同的表模式。
  • Hudi 已经集成了 Flink,便于我们计算引擎的统一。最后 Hudi 也有相对比较活跃的社区。

使用 Hudi 后,在没有引入 Flink CDC 之前,我们的数据入湖架构如下:

首先使用 Canal 通过解析 Binlog 的方式将增量数据同步到 Kafka 中,然后通过 DataX,将 MySQL 的全量数据同步到 HDFS 上,然后使用 bulk_insert 的方式初始化数据到 Hudi 中,完成全量数据的初始化,最后,使用 Flink 消费 Kafka 的数据,将数据写入到 Hudi 中,同时通过主键解决数据冲突问题。

大家可以看到,这个架构整体的链路比较长,操作频繁、维护成本比较高,涉及的组件比较多,对于完成数据接入工作量比较大,并且稳定性不好保证,如果一旦有数据问题,数据恢复、重导也是一件比较痛苦的事情。

在使用 Flink CDC 之后,结合 DTS 平台,架构如上。在 DTS 平台中,很方便的可以实现 MySQL 数据一键入湖,并且得益于 Flink CDC 全增量一体化框架,不用考虑全增量问题,同时也支持动态增加表功能,操作非常简单。

随着接入表的增多,对于同一个数据源下的数据同步任务,建立了过多数据库连接,导致 Binlog 重复读取,会对数据源库造成巨大的压力。另外有些 Task 同步的数据量很小,也会造成一定的资源浪费。

为了解决这个问题,我们使用 API 的方式读取数据,通过侧输出流的方式对 DataStream 进行分流,实现合并 Source 的功能。对于读取的同一数据源,同一任务只会建立一个数据库连接,Binlog 也只会读取一次,降低了对数据库的压力,方便的实现了单任务多表的数据实时入湖。

在数据实时同步写入 Hudi 时,Flink Hudi 的写入 Pipeline 算子如下。

第一个算子负责将快照数据+增量数据加载到 Flink 状态。接着经过一个 Bucket Assigner,它主要负责将已经转好的 HudiRecord 分配到特定的 File Group 中,接着分好 File Group 的 Record 会流入 Writer 算子执行真正的文件写入。再之后会接一个 Compaction 的算子,主要用来解决 MOR 表读放大的问题。

这个架构在实际的生产环境会遇到如下问题:

  • 当数据量比较大的时候,Flink State 的膨胀会比较厉害,相应地会影响 Task 的写入速度以及 Checkpoint 的成功率。
  • 对于 Compaction 算子,当在执行 Compaction 阶段时,会和数据读写算子进行资源的抢占,也会导致任务的背压、Checkpoint 超时等。

为了解决这个问题,我们把 Compaction 进行单独拆分,拆分为一个独立的调度任务,同时为了合并的合理性,对相关的合并计划也做了一些优化。

除此之外,我们还做了一些其他的优化和 bug 修复。

  • 第一,在 Hudi 同步 Hive 分区时,会对 Hive 外表和 Hudi 表当前表结构、分区做比较,会获取 Hive 的所有分区,而我们在 Hive 层面对分区访问做了限制,超过分区数量限制,禁止访问,所以触发了该问题,我们通过修改源码,如果是分区表,对访问 Hive 的分区做了过滤,只访问最近一段时间的分区。
  • 第二,在业务 MySQL 升级时,出现了混合模式的 Binlog,导致任务失败,也是修改了源码,忽略了一些 DML 的 Binlog 操作,具体 patch 可以参考 3319;
  • 第三,解决了一些 Flink CDC 分片的 bug,如 Flink CDC 分片字段是 String 时,比较逻辑没有忽略大小写,导致抽取全量数据到内存,导致任务失败。Flink CDC 分片字段是 bigint 时,ID 差值较大,触发了 Flink CDC 的分片优化逻辑,但在优化逻辑后加载了大量数据到内存中,所以优化参数,降低数据分布因子等。

除此之外,还有一些其他优化实践,大部分可以查阅资料或在社区的帮助下解决,在这里再次感谢社区。

四、未来规划

简单介绍一下我们的未来规划:

  • 第一,全量阶段的异步切片逻辑优化,目前数据全量读取阶段读取流程为首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader。在所有 Snapshot Chunk 读取完成后,下发一个 Binlog chunk 进行增量部分的 Binlog 读取。Snapshot Chunk 划分及读取是顺序的,影响整个读取阶段的性能,导致大表全量接入阶段周期长。所以优化 Snapshot Chunk 划分切片的逻辑,增加异步读取策略,提升全量读取阶段性能
  • 第二,目前我们的数据集成工具只覆盖了少量的离线场景,后续准备覆盖更多的离线数据集成场景。
  • 第三,目前我们的实时数据集成的质量相对还比较薄弱,需要进一步加强和打磨。

点击查看直播回放和演讲 PPT


更多内容


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:阿里云免费试用 - 阿里云

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/454024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mybatis-Plus详解(新建maven项目、查询所有信息、打印SQL日志、实现CRUD(增删改查)、分页、条件查询且分页,前后端分离式开发)

Mybatis-Plus详解(新建maven项目、查询所有信息、打印SQL日志、实现CRUD(增删改查)、分页、条件查询且分页,前后端分离式开发) MyBatis-Plus(opens new window) (简称MP) 是一个MyBatis(opens new window)的增强工具,在MyBatis的基础上只做增强不做改变…

【牛客网】最难的问题与因子个数

目录 一、编程题 1.最难的问题 2.因子个数 一、编程题 1.最难的问题 链接:最难的问题__牛客网 (nowcoder.com) NowCoder生活在充满危险和阴谋的年代。为了生存,他首次发明了密码,用于军队的消息传递。假设你是军团中的一名军官&#xff…

Linux网络服务之yum仓库

目录 一、yum仓库简介二. ftp搭建yum源三. 搭建国内在线源四. 本地源和在线yum同时使用五. 通过缓存的方式保存所下载的软件包六 . 制作yum仓库 一、yum仓库简介 yum是一个基于RPM包(是Red-Hat Package Manager红帽软件包管理器的缩写)构建的软件更新机…

TortoiseSVN使用-权限配置

文章目录 3.5 权限配置3.5.1 单一版本库权限配置3.5.2 多版本库共享配置 3.5 权限配置 3.5.1 单一版本库权限配置 ①要设置授权访问就需要创建用户,并为用户设定权限 ②打开授权访问的配置 [1]打开 D:\DevRepository\Subversion\ERP\conf\svnserve.conf [2]将第 …

Day953.以假设驱动为指引 -遗留系统现代化实战

以假设驱动为指引 Hi,我是阿昌,今天学习记录的是关于以假设驱动为指引的内容。 很多人在做遗留系统现代化的时候呢,总觉得它是一个十分复杂的技术问题。 本来嘛,无论是代码的重构、架构的拆分,还是 DevOps 平台的搭…

2023年 团体程序设计天梯赛个人感悟及总结(附题解)——遗憾国三

今年也是第一次参加了天梯赛,在这里也写一下自己的一些赛前准备、比赛过程的一些问题,以及赛后的一些总结以及感悟叭!首先赛前准备的话也不能说我准备的非常的充分吧,但是L2阶的题目我是真的刷的很猛很疯的呢,这样看来…

Python类的继承

一、类的继承 1、什么是继承 通过继承基类来得到基类的功能 所以我们把被继承的类称作父类或基类,继承者被称作子类 代码的重用 2、父(基)类与子类 子类拥有父类的所有属性和方法 父类不具备子类自有的属性和方法 3、继承的用法 定义…

vite+react+ts+mobx+antd+react-router-dom+sass+tailwindcss

写了Vue项目比较多了,最近想换一下react技术栈,锻炼自己的技术,废话不多说,开始创建项目吧,写这篇博客也只是记录我创建的过程,不通的版本难免有坑,欢迎一起分享讨论下! 1、npm create vite //…

【李老师云计算】Spark配置及Scala实现100个随机数找最大值

索引 前言1. Spark部署1.1 Spark下载1.2 解压Spark1.3 修改环境变量1.4 修改主机Spark配置文件1.4.1 slaves.template文件配置1.4.2 spark-env.sh.template文件配置 1.5 分享主机Spark到从机1.6 启动Spark集群(★重启后的操作)1.7 通过jps查看是否启动成功1.8 通过网页查看是否…

rk3568 适配摄像头 (mipi 单摄)

rk3568 适配摄像头 (mipi 单摄) MIPI CSI(Mobile Industry Processor Interface Camera Serial Interface)是一种用于移动设备的高速串行接口标准,用于连接图像传感器和图像处理器。MIPI CSI接口使用差分信号传输技术,将数据分为…

C/C++ 高精度(加减乘除)算法二进制优化

高级精度算法系列 第一章 简单实现 第二章 压位优化 第三章 二进制优化(本章) 文章目录 高级精度算法系列前言一、基本原理1、存储方式2、计算方式 二、关键实现1、整型转高精度数组(二进制)2、字符串转高精度数组(二进制)3、高精…

小程序进阶

1.1组件基础 自定义组件的结构与页面是一致的,即也包含有4个部分,分别为: .wxml 组件的布局结构 .js 组件的处理逻辑 .json 组件的配置文件 .wxstngs 组件的布局样式 1.1.1创建组件 通常将组件放到独立的目录components当中这个目录需要手动创建 …

Spring Boot的配置文件

目录 配置文件的作用 配置文件的格式 properties配置文件 格式 注释乱码问题 读取配置文件 properties的优缺点分析 YAML yml基本语法 yml配置的读取 注意事项:value的值加单双引号 配置对象 yml优点分析 properties和yml的区别 设置不同环境的配置文件 配置文件的…

Linux-搭建web服务器

综合练习:请给openlab搭建web网站 ​ 网站需求: ​ 1.基于域名[www.openlab.com](http://www.openlab.com)可以访问网站内容为 welcome to openlab!!! ​ 2.给该公司创建三个子界面分别显示学生信息,教学资料和缴费网站,基于[www.…

SpringCloud --- Ribbon负载均衡

一、负载均衡原理 SpringCloud底层其实是利用了一个名为Ribbon的组件,来实现负载均衡功能的。 那么我们发出的请求明明是http://userservice/user/1,怎么变成了http://localhost:8081的呢? 二、源码跟踪 为什么我们只输入了service名称就…

浅谈: 计算机—JVM—Java线程—池

计算机的基本组成 计算机的基本组成 计算机存储模型(CPU、寄存器、高速缓存、内存、外存) 现代计算机系统CPU和内存之间其实是有一个cache的层级结构的。比内存速度更快的存储介质(SRAM),普通内存一般是DRAM,这种读写速度更快的介质充当CPU和内存之间的…

3 连续模块(二)

3.5 零极点增益模块 在控制系统设计和分析中,常用的函数包括 传递函数(tf)、零极点(zpk)和状态空间(ss)函数 传递函数(tf):用于表示线性时不变系统的输入输出…

SQL Compliance Manager Crack

SQL Compliance Manager Crack 新的SQL CM云代理-扩展了当前SQL CM代理的功能,以支持EC2上Microsoft SQL服务器的远程审核。允许用户添加在共享网络位置上活动的SQL Server,以写入/读取数据并支持DBaaS SQL Server实例。云代理包含与当前SQL代理相同的行…

VS code 插件之中英文间自动添加空格

前言 不知道大家在开发过程中是不是会遇到写代码注释或者文本内容时中英文之间没有空格的情况,很多时候在写代码尤其是写注释的时候容易忘记加空格,但回过头来看又难以忍受,于是我就想着自己写一个 vscode 插件来解决这个问题,希…

跟我一起开启 linux 的学习吧

跟我学 CentOS 的安装 一、安装 VMware二、创建虚拟机三、安装 CentOS 7四、linux 的登录 一、安装 VMware VMware 计算机虚拟化软件 从官网 https://www.vmware.com/cn.html 下载并安装 这里就不再展示安装过程啦! 有需要的可以 点击这里 →→→ VMware 下载安装过…