畅捷通基于Flink的实时数仓落地实践

news2025/1/9 9:50:13

摘要:本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。内容主要为以下四部分:

  1. 业务背景
  2. 数仓建设
  3. 具体案例
  4. 未来展望

一、业务背景

畅捷通是用友旗下成员企业,一直持续专注于小微企业的数字化转型。我们提供以数智财税、数智商业为核心,以生态服务为延展的小微企业云服务,通过 T+Cloud、好业财、好生意、 好会计、易代账等核心产品,帮助小微企业实现人员、业务、客户、管理的在线,并通过这些在线加快企业向数智化转型的进度,目前畅捷通云平台累计注册用户数超过 800 万。

以下是早期数据处理架构的现状。

早期的数据架构分了层,有自己的业务数据库做的业务数据存储,同时也会有一个比较简单的一个数据仓库。数据的产品线比较多,所以是按照自己的实际业务诉求,选择了比较适合自己的存储技术。无论是 MySQL 还是 PolarDB,对于复杂数据分析工作,使用过程中都发现有些耗资源,这时我们就通过 DTS、DataHub、DataX 等工具进行数据同步,把数据从业务库搬迁到数据仓库中,数据会进入 Maxcompute 中做离线计算,但是数据仅是离线计算的一部分数据,所以最终的查询还要在业务库中去查。每天晚上会把离线算完的数据再搬迁回业务数据库中,最终由业务数据库提供服务。

除了提供用户数据外,有些业务像我们的业务风控管理,除了业务数据还需要分析线上日志和埋点等非结构化的数据,这些也需要我们同步过来,整体这些就构成了我们的数据架构。

在早期的使用过程中,这个架构能平稳的运行,随着产品不断发展,用户规模增大,交易数据也越来越多,这个架构已经不满足我们的诉求,主要表现在如下三个方面:

(1)性能降低

我们是面向企业提供多租户的 SAAS 服务,ToB 业务相对 ToC 要复杂一些,我们会存在一些多表关联的查询,当数据量大的时候,一方面这些查询自身性能低;另一方面他们还会拖累整个业务库性能,简单的分库已经无法有效提升查询效率;另外跑批操作每晚回写业务库,我们曾经发生过凌晨四点业务开始使用的时候,我们的回写任务仍然没有完成导致业务操作受损。

(2)多维度实时分析的业务诉求

业务有更早产出数据的诉求,现在 T+1 场景已经不满足业务的时效性;产品需要更多的大数据分析能力,进行精细化管理。

(3)现有的 ETL 模型不统一

现有的数据仓库技术不统一,使用与同步成本高,尤其表现在数据同步延迟或者丢失时没有有效监控手段导致问题定位时间长;再加上我们线上已有几百个数据库实例,配置更改不仅耗时而且容易出错,稍有不慎就有可能引发线上故障,尤其近一个月 P0 故障频发。

最后因为我们是 SAAS 产品,每两周业务迭代必然引发数据库的 DDL 变动,这更加重了我们的同步难度;

这些都导致我们要重构我们的数据处理架构,将复杂的分析操作从 OLTP 数据库中剥离出来转移到统一的 OLAP 系统去处理,为了实现这个目标,我们需要解决两大难题

一选择合适的数据流处理程序,能将原始数据正确导入到数据仓库

二选择合适的分析引擎进行数据分析

其次我们需要合适的数据流技术把原始数据正确导入到数据仓库对于数据流计算引擎我们希望这个数据流框架配置简单、能自动感知 DDL 变更、对于全量数据和增量数据采用同一套处理机制,尽量让业务人员只关注业务逻辑。

而对于分析引擎,我们首先要保证的是性能。对于 ToB 业务,交易数据具有更新频繁、高 QPS 以及复杂多表 Join 的特点,所以我们的引擎选择必须在不牺牲性能的情况下支持这些业务特性。

二、数仓建设

首先介绍数据流的发展历程。

整个数据流分为四个阶段。第一阶段只是使用 DTS,开源版本是 Kano。DTS同步时是在源端到目标端直接拉取同步链路,从业务库传入数据仓库中,全量采用一次性迁移,增量采用 Binlog 同步的方式来处理,运行一段时间后,发现我们的下游应用越来越多,每个应用都要建一条同步通道,即不好维护成本也高。所以我们采用 DTS+DataHub,进行链路收敛。将链路收敛后下游如果需要只用从 DataHub 中按需订阅即可。之后我们又发现简单的数据搬迁不满足我们的诉求,我们需要在数据同步过程中做简单的清理操作,比如增加一些聚合属性,这个时候我们引入了 Flink CDC 进行流计算处理,在使用 Flink 过程中,我们发现一些业务场景需要的就是明细数据,这个时候我们也使用了阿里 Flink 版本的语法糖 CDAS 技术,去实现整库多表的 DDL 和 DML 的自动化同步,提高了集成效率。在验证过程中,我们还发现 CDAS 是一体化自动操作,所以对资源的利用率是高于原生 CDC 的,主要原因是它可以复用一个源表节点读取多业务表数据,这样可以降低数据库连接数,避免重复拉取 Binlog 数据,达到节省资源的目的。

在进行流计算引擎选型时,我们主要对如下三个主流开源引擎做了对比:Apache Storm 、Spark Streaming 和 Flink。

首先 Apache Storm 是 twitte 开发的第一代流处理系统,基于 Record 级别处理数据,毫秒级延迟,它采用的 ack 机制对消息保障能力弱,At Least Once 保证了数据至少传输一次不被丢失但不保证重复性,因为这个容错机制(实现 At Least Once 语义),它的吞吐量也相对较低。其次 Spark Streaming 是微批处理的流模型,以固定时间(几秒)处理一段段的批处理作业,它采用 Checkpoint 机制保证了数据 Exactly once,精准传输一次,它的这种处理机制导致延迟高,但是吞吐量也高。最后来看 Flink,即支持流处理也支持微批和批处理,它采用异步分布式快照,能保证 Exactly once,同时容错代价也低,它是一个低延迟高吞吐的分布式流处理引擎。综上所述我们最终选择了 Flink。目前 Flink 线上的 Task 大概有六千多,每天涉及到的业务数据有五亿以上。

介绍完流存引擎后,再来介绍数仓的选型。我们更多考虑了开源版本 Clickhouse 和 StarRocks 能力的对比。

第一考虑应用程序的迁移。StarRocks 支持标准 SQL 语言,兼容 MySQL 协议,迁移方案对应用程序更友好。而 Clickhouse 对于标准 SQL 语法支持不完善。第二是性能方面,Clickhouse 单机单表的查询性能最优,但是在复杂查询场景中不可能将所有数据都放在一个大表中,需要复杂表的 Join 操作,对比了复杂查询的性能,StarRocks 性能更优。最后考虑并发能力,Clickhouse 并发能力较低,官方建议低于 100,而 StarRocks 可以采用多机多核的方式水平扩展提升并发。最终选型 StarRocks 作为分析引擎。

可以看到最终数仓的整个架构如图:

目前离线数仓和实时数仓并行,对于非实时要求的我们仍然在离线数仓 Maxcompute 里计算但不回写到业务库里,离线计算的内容我们尽量通过外表的方式进行访问,让数据保持一份,对于实时要求的我们通过 Flink 同步到 StarRocks 进行实时分析。现阶段 StarRocks 还不支持 Maxcompute 的外表使用,如果需要使用 mc 的计算结果,我们还需要将Maxcompute的内容再同步到 StarRocks 中进行使用,如果支持 Maxcompute 外表的话,我们就可以采用外表+物化视图的方式进行查询加速。整个链路虽然清晰,但使用的技术较多,所以最终重点做了链路监控,保证了数据一致性、结构一致性以及同步链路的延迟问题,发现异常情况下通过报警机制能够快速感知到。最终整个架构要求做到如果有业务分析的数据,该数据必须进入数仓中,同时清洗数据必须全面准确,展示需要通过可视化工具提供统一的建模方式提供给业务员进行使用。

数据仓库最终沉淀出一个数据中台,采用采、管、存、用一体化支撑业务对数据服务的诉求。

存储在业务库里的数据通过我们同步链路将数据采集出来,进行简单清理之后转存到我们的数据仓库里,然后采用统一的建模语言进行规范化的管理,我们使用统一的元数据进行指标描述,再利用同一套可视化设计器进行仪表板或者大屏的展示,最终形成了我们的语义模型资产,比如财务模型、增长模型等等,最后将这些分析好的数据按照不同场景统一提供给各个不同的应用进行使用。这些数据不仅包括我们企业的数据,还包括生态合作的数据。

三、具体案例

介绍完基础架构后,来介绍几个案例。

案例一是基于场景化的决策分析。

一个企业需要经常关心经营数据,有一些工具帮助分析指标项,但是对于小微企业使用专业工具进行分析,难度会大,学习成本也很高。所以产品需要满足一定的行业特性,能够开箱即用的简单数据分析能力。

为了做一个通用产品,支持多种方式,在整个指标体系过程中,会发现参与整个指标计算的维度例如科目维度都有所差异。此外,根据行业不同,指标定义也不同,例如零售服务业更关注收入利用率,制造业更关注成本。

所以难点是需要在兼顾开箱即用的基础上如何支持各企业不同的变化,更快计算出所需的数据。业务痛点首先需要支持按用户的需求能够支持不同维度的自定义,第二因为指标是高度汇总的,通过大屏会发现很多指标都在一个页面上显示,性能无法跟上。第三实时大屏数据本身需要不断刷新,最后需要做行业数据对标。从这些方面考虑,基于场景的分析模型会比较复杂。仅仅放在一个流计算中做计算清洗不够,所以我们采用的数据集成方案是ELT模式,业务数据直接提取到数据仓库中,借助数据仓库的计算能力进行分层汇总。

以我们的盈利分析为例,从下往上,我们建立了贴原层、明细层、中间层,贴原层存放原始单据比如我们的凭证及其分录表,明细层按照业务场景转换成指标计算需要的数据模型比如我们的辅助科目余额表,然后中间层存放的是基于辅助科目余额表计算出来的各类指标。我们的指标分为原子指标和派生指标,同一层同一维度的指标批量计算完才继续更高层的指标计算,这样可以提升我们整体的的指标计算效率。

基础架构如图所示:

业务正常读写数据到业务库中,我们使用 Flink CDAS 进行整表同步,全量同步使用 Snapshot,增量同步使用 Binlog,由 Flink 自动帮我们进行全量转增量的切换,最终原始单据进入到数仓中,再通过调度程序进行清洗转换,清洗后的数据在 StarRocks 中进行计算,计算数据通过分层模型建立标准,最终的建模模型,建模口径都一样。所以我们可以做到一套业务做不同分析给不同场景提供服务。

第二个场景是 BC 一体跨租户场景的管控模式。

企业在数字化浪潮情况下,经营重心转向了“一切以客户为中心”的经营模式。头部企业例如大品牌商公司蒙牛、飞利浦、元气森林等,需要通过 BC 一体化方式严格管控经销商,从而能够达到零售面向用户的动销模式。这种情况下品牌商需要知道他的一级、二级经销商的销售和库存情况,一旦商品下发到经销商后,经销商需要将它的销售情况和库存情况上报给品牌商,品牌商会统一管控,决定下一年的生产。业务数据库是 SQL Server 数据库,SQL Server 在 2016 版本之前不支持 CDC,而我们不可能要求用户都升级到新版本。一开始我们使用自研的 ETL 数据框架,产品先有一个数据转换服务进行数据清理和数据上报,为了不对数仓产生太大压力,我们通过 OSS 和消息队列进行系统解耦,然后再有一个专门加载的程序订阅变更消息进行数据上报。在加载服务中需要考虑两点:一是需要考虑高可用性,第二需要考虑被压机制,当上游数据太多全部压入到下游数据时,很容易压垮StarRocks,此时需要放慢加载的速率。后来发现 Flink 源表支持云存储 OSS,在 OSS 中只需要设置 source.monitor-interval 就会定时按照指定的时间间隔扫描不同路径,当扫描到有文件发生变化时,自动感知自动同步。所以使用Flink方案替代了自研的加载服务。

第三个案例是基于 ES 的准实时搜索。

这个业务场景在很多电商领域都存在,我们有一个微商城产品需要根据用户输入条件支持商品的智能检索,这个项目的特点是要求低延迟,高质量,也就是商品属性如价格一旦发生变动就需要实时同步到 Elasticsearch 里,进而能够直接被业务端直接感知。

基于这些考虑,基础架构如图。

这个技术方案我们将清理转换任务交给了 Flink 去处理,首先使用 Flink 做数据清洗,然后转到 ES 中。为了保证链路准确并且不中断,我们使用双链路方式进行保证。分为两个 Flink,当一个 Flink 出问题时另一个 Flink 会替代。在过程中如果链路发生中断需要及时感知并且修复问题。当时也做了一个方案,主动向 Datahub 中投递监测数据,定时检测数据延迟,当延迟高的时候进行预警,所以该项目中使用双链路以及主动的监控方案做了高可用处理,满足了业务诉求。

四、未来展望

以前是大数据时代,现在是大模型时代,AI 和大数据怎么结合使用?上图是双轮驱动的模式。企业的数智化转型分三个阶段,第一个是云化连接(上云)阶段,即企业通过上云实现基础的数字化连接;第二是数据驱动(用数)阶段,企业开始利用积累的数据资源,驱动决策和优化流程;第三个是智能运营(赋智)阶段,通过深度融合AI原生技术,实现运营的智能化和自动化。在这一过程中,数据资源的高效利用和智能技术的创新应用,成为推动企业数智化进阶的核心动力。

大模型时代无数据不智能,大模型本身就是在海量数据集上进行的训练,比如 GPT-4 在约 13 万亿 Tokens 上训练,据说它已经看完市面上所有的视频。大模型天然支持海量的数据查询,那么在这种情况下为什么还需要把数据查询和大模型结合起来?因为那些数据都属于通识类数据,但是落到一个具体的企业上时,企业的私有数据往往是大模型不具备的。我们要分析一个企业自己的经营状况时只能拿企业的私有数据结合大模型的推理能力和分析能力帮助企业做洞察,所以需要双轮驱动方式。这种双轮驱动的方式,不仅能够提升数据查询的效率,还能增强数据服务的智能性。我们期待技术的进步,如自然语言人机交互、实时数据查询、多维分析、湖仓一体以及 HTAP(混合事务/分析处理),能够进一步成熟,为企业提供更加强大和灵活的数据分析工具,推动企业在数智化道路上不断前行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1994872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

七夕情人节有什么好物推荐?五款心动数码产品推荐。

转眼间,七夕情人节即将来临,这个浪漫的节日是表达爱意、传递心意的最佳时刻!为了让爱意不再迷茫,团团精心挑选了一份情人节约会好物清单。这些精选产品,我亲自试用,确信它们能触动心弦,传递深情…

html+css 实现hover 3D按钮特效

前言:哈喽,大家好,今天给大家分享htmlcss 绚丽效果!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目…

《向量数据库指南》——To B大型知识系统的多租设计

To B大型知识系统的多租设计 这类场景中,租户数量一般比较少。比如企业内多个独立的业务团队或部门,如果他们都在提供不同的知识库服务,那么对于数据库中台团队,每一个这样的业务团队或部门都是一个租户。 在向量数据库层面,中台团队需要根据业务复杂度为每个租户分…

儿童餐椅 亚马逊美国站ASTM认证步骤

本政策适用的儿童餐椅 儿童餐椅是一种供三岁以下儿童使用的独立座椅。座椅表面超出地面 15 英寸以上,可提高儿童所坐高度,通常于儿童进食时使用。儿童餐椅可由塑料、木材或金属制成,通常配有软垫座。儿童餐椅有带托盘和不带托盘的款式&#…

口碑最好的洗地机排名?洗地机十大排名公开揭晓!

随着现在洗地机逐渐走入大众的眼睛,很多人已经开始寻找合适的洗地机,但是因为市面上的品牌太多了,宣传的噱头也比较多,因此很多人都比较迷糊,不知道应该选择怎样的洗地机!本文将公开揭晓洗地机领域的十大排…

数学建模--蒙特卡洛算法之电子管更换刀片寿命问题

目录 1.电子管问题重述 2.电子管问题分析 3.电子管问题求解 4.刀片问题重述 5.刀片问题分析 6.刀片问题求解 1.电子管问题重述 某设备上安装有4只型号规格完全相同的电子管,已知电子管寿命服从100~200h之间的均匀分布. 只要有一个电子管…

P5677 [GZOI2017] 配对统计(线段树爆改莫队)

[GZOI2017] 配对统计 - 洛谷 贵州的省选,线段树代码太长了,直接莫队启动。 时间复杂度O(n sqrt(n)) 加了个奇偶优化。 先按块排左指针。 同一块内右指针按照块的编号奇偶性决定升序或者降序。 核心思路 拓展左右指针 或者…

【Linux】(32)详解命名管道 | 日志管理 | 进程池2.0

目录 1. 介绍 2. 理解 3.运用 3.1 简易通信 makefile comm.hpp server.cc (服务端,读取显示) client.cc (客户端,写入) 3.2 日志 log.hpp 1. 定义日志级别 2. 实现日志函数 可变参数 3. 日志输出管理 3…

【经验分享】快速制图 AI + Mermaid

文章目录 【经验分享】快速制图 AI Mermaid使用教程选图改Prompt出图 【经验分享】快速制图 AI Mermaid 使用教程 选图 在Mermaid的Diagram Syntax中,选择需要转换的样式图,复制对应的示例代码: 改Prompt 请根据这份文档帮我制作一份…

镜舟科技与西南证券合作,构建极速、高效数据平台

《金融科技发展规划(2022-2025年)》明确了高质量推进金融数字化转型的总体思路,云计算、人工智能等新兴技术开始被广泛应用,提升金融服务的便捷性,但随着日益增长的数据体量,数据的存储和处理能力日渐。 数…

【前 K 个高频元素】python刷题记录

R4-排序篇 class Solution:def topKFrequent(self, nums: List[int], k: int) -> List[int]:#使用哈希表记录值,再sortdictdefaultdict(int)ret[]for num in nums:dict[num]1thesorted(dict.keys(),keylambda x:dict[x], reverseTrue)for i in range(k):ret.appe…

python-docx-template实现docx模板编程

1、python-docx-template简介 python-docx库来创建word文档,但是对于文档的修改功能并不灵活。python-docx-template 模块主要依赖两个库, python-docx用于读取,编写和创建子文档 , jinja2用于管理插入到模板docx中的标签 。 其基…

90天SSL证书时代来临的应对策略-SSL证书自动化运维

SSL证书有效期长期以来都是网络安全专业人士关注的重点,尤其是去年3月Google在CA/B论坛投票提案中提议将SSL证书的有效期限制在90天之后,毕竟这对高度依赖手动管理证书的企业影响巨大。本文将概述对证书有效期缩短的理解以及应对策略。 90天是最终的有效…

Opencv-绘制几何图形

1. 绘制圆形 1.1 circle()函数原型 void cv::circle(InputOutputArray img, Point center, int radius, const Scalar & color, int thickness 1, int lineType LINE_8, int shift 0 ) img:需要绘制圆形的图像。 center:圆形的圆心位置坐标。 …

Python中的 `continue` 语句:掌握循环控制的艺术

Python中的 continue 语句:掌握循环控制的艺术 下滑即可查看博客内容 🌈 欢迎莅临我的个人主页 👈这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地!🎇 🎓 博主简介:985高校的普通…

国产数据库备份恢复实现

数据库备份恢复是数据库高可用的基本能力,如何通过备份数据快速高效的恢复业务并且满足不同场景下的恢复需求,是各数据库厂商需要关注的要点。本文将介绍几种国产数据库的备份恢复功能,以加深了解。 1、数据库备份恢复方案 数据库备份是生产…

为什么选择在Facebook投放广告?

2024年了你还没对 Facebook 广告产生兴趣?那你可就亏大了! 今天这篇文章,我们会分享它对你扩大业务的好处。要知道,Facebook 广告凭借它庞大的用户群和先进的定位选项,已经是企业主们有效接触目标受众的必备神器。接下…

<数据集>固定视角监控牧场绵羊识别数据集<目标检测>

数据集格式:VOCYOLO格式 图片数量:3615张 标注数量(xml文件个数):3615 标注数量(txt文件个数):3615 标注类别数:1 标注类别名称:[Sheep] 序号类别名称图片数框数1Sheep361529632 使用标注工具&#…

leetcode26_删除有序数组中的重复项

思路 双指针 func removeDuplicates(nums []int) int {if len(nums) < 2 {return len(nums)}// 双指针//区间 [0, slow] 代表已遍历且不重复元素//区间 [fast,len(nums)) 代表还未遍历的元素slow, fast : 0,1for ;fast < len(nums);{if nums[slow] ! nums[fast] {slown…

DICT运维服务目录

1、CT类项目或交付物的运维工作由网络条线负责,相关工作的SLA、流程、职责划分等按网络条线的相关制度执行 2、本服务目录主要面向IT部分的运维工作IT模式分类一级服务名称二级服务名称服务内容服务目标服务指标服务指标目标值建议工作结果呈现基础运维包增值服务指标参考费用…