摘要:本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。内容主要为以下四部分:
- 业务背景
- 数仓建设
- 具体案例
- 未来展望
一、业务背景
畅捷通是用友旗下成员企业,一直持续专注于小微企业的数字化转型。我们提供以数智财税、数智商业为核心,以生态服务为延展的小微企业云服务,通过 T+Cloud、好业财、好生意、 好会计、易代账等核心产品,帮助小微企业实现人员、业务、客户、管理的在线,并通过这些在线加快企业向数智化转型的进度,目前畅捷通云平台累计注册用户数超过 800 万。
以下是早期数据处理架构的现状。
早期的数据架构分了层,有自己的业务数据库做的业务数据存储,同时也会有一个比较简单的一个数据仓库。数据的产品线比较多,所以是按照自己的实际业务诉求,选择了比较适合自己的存储技术。无论是 MySQL 还是 PolarDB,对于复杂数据分析工作,使用过程中都发现有些耗资源,这时我们就通过 DTS、DataHub、DataX 等工具进行数据同步,把数据从业务库搬迁到数据仓库中,数据会进入 Maxcompute 中做离线计算,但是数据仅是离线计算的一部分数据,所以最终的查询还要在业务库中去查。每天晚上会把离线算完的数据再搬迁回业务数据库中,最终由业务数据库提供服务。
除了提供用户数据外,有些业务像我们的业务风控管理,除了业务数据还需要分析线上日志和埋点等非结构化的数据,这些也需要我们同步过来,整体这些就构成了我们的数据架构。
在早期的使用过程中,这个架构能平稳的运行,随着产品不断发展,用户规模增大,交易数据也越来越多,这个架构已经不满足我们的诉求,主要表现在如下三个方面:
(1)性能降低
我们是面向企业提供多租户的 SAAS 服务,ToB 业务相对 ToC 要复杂一些,我们会存在一些多表关联的查询,当数据量大的时候,一方面这些查询自身性能低;另一方面他们还会拖累整个业务库性能,简单的分库已经无法有效提升查询效率;另外跑批操作每晚回写业务库,我们曾经发生过凌晨四点业务开始使用的时候,我们的回写任务仍然没有完成导致业务操作受损。
(2)多维度实时分析的业务诉求
业务有更早产出数据的诉求,现在 T+1 场景已经不满足业务的时效性;产品需要更多的大数据分析能力,进行精细化管理。
(3)现有的 ETL 模型不统一
现有的数据仓库技术不统一,使用与同步成本高,尤其表现在数据同步延迟或者丢失时没有有效监控手段导致问题定位时间长;再加上我们线上已有几百个数据库实例,配置更改不仅耗时而且容易出错,稍有不慎就有可能引发线上故障,尤其近一个月 P0 故障频发。
最后因为我们是 SAAS 产品,每两周业务迭代必然引发数据库的 DDL 变动,这更加重了我们的同步难度;
这些都导致我们要重构我们的数据处理架构,将复杂的分析操作从 OLTP 数据库中剥离出来转移到统一的 OLAP 系统去处理,为了实现这个目标,我们需要解决两大难题
一选择合适的数据流处理程序,能将原始数据正确导入到数据仓库
二选择合适的分析引擎进行数据分析
其次我们需要合适的数据流技术把原始数据正确导入到数据仓库对于数据流计算引擎我们希望这个数据流框架配置简单、能自动感知 DDL 变更、对于全量数据和增量数据采用同一套处理机制,尽量让业务人员只关注业务逻辑。
而对于分析引擎,我们首先要保证的是性能。对于 ToB 业务,交易数据具有更新频繁、高 QPS 以及复杂多表 Join 的特点,所以我们的引擎选择必须在不牺牲性能的情况下支持这些业务特性。
二、数仓建设
首先介绍数据流的发展历程。
整个数据流分为四个阶段。第一阶段只是使用 DTS,开源版本是 Kano。DTS同步时是在源端到目标端直接拉取同步链路,从业务库传入数据仓库中,全量采用一次性迁移,增量采用 Binlog 同步的方式来处理,运行一段时间后,发现我们的下游应用越来越多,每个应用都要建一条同步通道,即不好维护成本也高。所以我们采用 DTS+DataHub,进行链路收敛。将链路收敛后下游如果需要只用从 DataHub 中按需订阅即可。之后我们又发现简单的数据搬迁不满足我们的诉求,我们需要在数据同步过程中做简单的清理操作,比如增加一些聚合属性,这个时候我们引入了 Flink CDC 进行流计算处理,在使用 Flink 过程中,我们发现一些业务场景需要的就是明细数据,这个时候我们也使用了阿里 Flink 版本的语法糖 CDAS 技术,去实现整库多表的 DDL 和 DML 的自动化同步,提高了集成效率。在验证过程中,我们还发现 CDAS 是一体化自动操作,所以对资源的利用率是高于原生 CDC 的,主要原因是它可以复用一个源表节点读取多业务表数据,这样可以降低数据库连接数,避免重复拉取 Binlog 数据,达到节省资源的目的。
在进行流计算引擎选型时,我们主要对如下三个主流开源引擎做了对比:Apache Storm 、Spark Streaming 和 Flink。
首先 Apache Storm 是 twitte 开发的第一代流处理系统,基于 Record 级别处理数据,毫秒级延迟,它采用的 ack 机制对消息保障能力弱,At Least Once 保证了数据至少传输一次不被丢失但不保证重复性,因为这个容错机制(实现 At Least Once 语义),它的吞吐量也相对较低。其次 Spark Streaming 是微批处理的流模型,以固定时间(几秒)处理一段段的批处理作业,它采用 Checkpoint 机制保证了数据 Exactly once,精准传输一次,它的这种处理机制导致延迟高,但是吞吐量也高。最后来看 Flink,即支持流处理也支持微批和批处理,它采用异步分布式快照,能保证 Exactly once,同时容错代价也低,它是一个低延迟高吞吐的分布式流处理引擎。综上所述我们最终选择了 Flink。目前 Flink 线上的 Task 大概有六千多,每天涉及到的业务数据有五亿以上。
介绍完流存引擎后,再来介绍数仓的选型。我们更多考虑了开源版本 Clickhouse 和 StarRocks 能力的对比。
第一考虑应用程序的迁移。StarRocks 支持标准 SQL 语言,兼容 MySQL 协议,迁移方案对应用程序更友好。而 Clickhouse 对于标准 SQL 语法支持不完善。第二是性能方面,Clickhouse 单机单表的查询性能最优,但是在复杂查询场景中不可能将所有数据都放在一个大表中,需要复杂表的 Join 操作,对比了复杂查询的性能,StarRocks 性能更优。最后考虑并发能力,Clickhouse 并发能力较低,官方建议低于 100,而 StarRocks 可以采用多机多核的方式水平扩展提升并发。最终选型 StarRocks 作为分析引擎。
可以看到最终数仓的整个架构如图:
目前离线数仓和实时数仓并行,对于非实时要求的我们仍然在离线数仓 Maxcompute 里计算但不回写到业务库里,离线计算的内容我们尽量通过外表的方式进行访问,让数据保持一份,对于实时要求的我们通过 Flink 同步到 StarRocks 进行实时分析。现阶段 StarRocks 还不支持 Maxcompute 的外表使用,如果需要使用 mc 的计算结果,我们还需要将Maxcompute的内容再同步到 StarRocks 中进行使用,如果支持 Maxcompute 外表的话,我们就可以采用外表+物化视图的方式进行查询加速。整个链路虽然清晰,但使用的技术较多,所以最终重点做了链路监控,保证了数据一致性、结构一致性以及同步链路的延迟问题,发现异常情况下通过报警机制能够快速感知到。最终整个架构要求做到如果有业务分析的数据,该数据必须进入数仓中,同时清洗数据必须全面准确,展示需要通过可视化工具提供统一的建模方式提供给业务员进行使用。
数据仓库最终沉淀出一个数据中台,采用采、管、存、用一体化支撑业务对数据服务的诉求。
存储在业务库里的数据通过我们同步链路将数据采集出来,进行简单清理之后转存到我们的数据仓库里,然后采用统一的建模语言进行规范化的管理,我们使用统一的元数据进行指标描述,再利用同一套可视化设计器进行仪表板或者大屏的展示,最终形成了我们的语义模型资产,比如财务模型、增长模型等等,最后将这些分析好的数据按照不同场景统一提供给各个不同的应用进行使用。这些数据不仅包括我们企业的数据,还包括生态合作的数据。
三、具体案例
介绍完基础架构后,来介绍几个案例。
案例一是基于场景化的决策分析。
一个企业需要经常关心经营数据,有一些工具帮助分析指标项,但是对于小微企业使用专业工具进行分析,难度会大,学习成本也很高。所以产品需要满足一定的行业特性,能够开箱即用的简单数据分析能力。
为了做一个通用产品,支持多种方式,在整个指标体系过程中,会发现参与整个指标计算的维度例如科目维度都有所差异。此外,根据行业不同,指标定义也不同,例如零售服务业更关注收入利用率,制造业更关注成本。
所以难点是需要在兼顾开箱即用的基础上如何支持各企业不同的变化,更快计算出所需的数据。业务痛点首先需要支持按用户的需求能够支持不同维度的自定义,第二因为指标是高度汇总的,通过大屏会发现很多指标都在一个页面上显示,性能无法跟上。第三实时大屏数据本身需要不断刷新,最后需要做行业数据对标。从这些方面考虑,基于场景的分析模型会比较复杂。仅仅放在一个流计算中做计算清洗不够,所以我们采用的数据集成方案是ELT模式,业务数据直接提取到数据仓库中,借助数据仓库的计算能力进行分层汇总。
以我们的盈利分析为例,从下往上,我们建立了贴原层、明细层、中间层,贴原层存放原始单据比如我们的凭证及其分录表,明细层按照业务场景转换成指标计算需要的数据模型比如我们的辅助科目余额表,然后中间层存放的是基于辅助科目余额表计算出来的各类指标。我们的指标分为原子指标和派生指标,同一层同一维度的指标批量计算完才继续更高层的指标计算,这样可以提升我们整体的的指标计算效率。
基础架构如图所示:
业务正常读写数据到业务库中,我们使用 Flink CDAS 进行整表同步,全量同步使用 Snapshot,增量同步使用 Binlog,由 Flink 自动帮我们进行全量转增量的切换,最终原始单据进入到数仓中,再通过调度程序进行清洗转换,清洗后的数据在 StarRocks 中进行计算,计算数据通过分层模型建立标准,最终的建模模型,建模口径都一样。所以我们可以做到一套业务做不同分析给不同场景提供服务。
第二个场景是 BC 一体跨租户场景的管控模式。
企业在数字化浪潮情况下,经营重心转向了“一切以客户为中心”的经营模式。头部企业例如大品牌商公司蒙牛、飞利浦、元气森林等,需要通过 BC 一体化方式严格管控经销商,从而能够达到零售面向用户的动销模式。这种情况下品牌商需要知道他的一级、二级经销商的销售和库存情况,一旦商品下发到经销商后,经销商需要将它的销售情况和库存情况上报给品牌商,品牌商会统一管控,决定下一年的生产。业务数据库是 SQL Server 数据库,SQL Server 在 2016 版本之前不支持 CDC,而我们不可能要求用户都升级到新版本。一开始我们使用自研的 ETL 数据框架,产品先有一个数据转换服务进行数据清理和数据上报,为了不对数仓产生太大压力,我们通过 OSS 和消息队列进行系统解耦,然后再有一个专门加载的程序订阅变更消息进行数据上报。在加载服务中需要考虑两点:一是需要考虑高可用性,第二需要考虑被压机制,当上游数据太多全部压入到下游数据时,很容易压垮StarRocks,此时需要放慢加载的速率。后来发现 Flink 源表支持云存储 OSS,在 OSS 中只需要设置 source.monitor-interval 就会定时按照指定的时间间隔扫描不同路径,当扫描到有文件发生变化时,自动感知自动同步。所以使用Flink方案替代了自研的加载服务。
第三个案例是基于 ES 的准实时搜索。
这个业务场景在很多电商领域都存在,我们有一个微商城产品需要根据用户输入条件支持商品的智能检索,这个项目的特点是要求低延迟,高质量,也就是商品属性如价格一旦发生变动就需要实时同步到 Elasticsearch 里,进而能够直接被业务端直接感知。
基于这些考虑,基础架构如图。
这个技术方案我们将清理转换任务交给了 Flink 去处理,首先使用 Flink 做数据清洗,然后转到 ES 中。为了保证链路准确并且不中断,我们使用双链路方式进行保证。分为两个 Flink,当一个 Flink 出问题时另一个 Flink 会替代。在过程中如果链路发生中断需要及时感知并且修复问题。当时也做了一个方案,主动向 Datahub 中投递监测数据,定时检测数据延迟,当延迟高的时候进行预警,所以该项目中使用双链路以及主动的监控方案做了高可用处理,满足了业务诉求。
四、未来展望
以前是大数据时代,现在是大模型时代,AI 和大数据怎么结合使用?上图是双轮驱动的模式。企业的数智化转型分三个阶段,第一个是云化连接(上云)阶段,即企业通过上云实现基础的数字化连接;第二是数据驱动(用数)阶段,企业开始利用积累的数据资源,驱动决策和优化流程;第三个是智能运营(赋智)阶段,通过深度融合AI原生技术,实现运营的智能化和自动化。在这一过程中,数据资源的高效利用和智能技术的创新应用,成为推动企业数智化进阶的核心动力。
大模型时代无数据不智能,大模型本身就是在海量数据集上进行的训练,比如 GPT-4 在约 13 万亿 Tokens 上训练,据说它已经看完市面上所有的视频。大模型天然支持海量的数据查询,那么在这种情况下为什么还需要把数据查询和大模型结合起来?因为那些数据都属于通识类数据,但是落到一个具体的企业上时,企业的私有数据往往是大模型不具备的。我们要分析一个企业自己的经营状况时只能拿企业的私有数据结合大模型的推理能力和分析能力帮助企业做洞察,所以需要双轮驱动方式。这种双轮驱动的方式,不仅能够提升数据查询的效率,还能增强数据服务的智能性。我们期待技术的进步,如自然语言人机交互、实时数据查询、多维分析、湖仓一体以及 HTAP(混合事务/分析处理),能够进一步成熟,为企业提供更加强大和灵活的数据分析工具,推动企业在数智化道路上不断前行。