目录
前言
一、架构1.0:传统Lambda架构
二、OLAP引擎调研
三、架构2.0:数据服务层All in Apache Doris
四、架构 3.0:基于Doris Multi-Catalog的湖仓一体架构
五、实践经验
5.1 引入Merge-on-Write,百亿级单表查询提速近三倍
5.2 部分列数据更新,数据开发效率提升100%
5.3 丰富Join的优化手段,整体查询速度最高提升近四倍
5.4 Light Schema Change,线上 QPS 高达3000+
5.5 优化实时Join场景,开发成本降低 70%
六、优化经验
6.1 E-235(文件版本过多)
6.2 E-233(事务挤压)
6.3 FE 假死
七、未来规划
原文大佬的这篇Doris湖仓一体建设,这里摘抄下来用作学习和知识沉淀。
前言
本文以某工商信息商业查询平台为例,介绍其从传统 Lambda 架构到基于 Doris Multi-Catalog 的湖仓一体架构演进历程。同时通过一系列实践,展示了如何保证数据的准确性和实时性,以及如何高效地处理和分析大规模数据,为信息服务行业提供了有价值的参考思路,有助于推动整个行业的发展和创新。架构转变,使得该平台实现了离线及实时数仓的数据入口和查询出口的统一,满足BI分析、离线计算、C端高并发等业务需求,为企业内部、产品营销、客户运营等场景提供了强大的数据洞察及价值挖掘能力。
一、架构1.0:传统Lambda架构
该商业查询平台成立之初,主要致力于 ToC 和 ToB 这两条业务线。ToC 业务线主要是为 C 端用户提供个性化、精准化的服务;而 ToB业务线侧重于为 B端客户提供数据接口服务与交付服务,同时也会处理一些公司内部数据分析需求,以数据驱动企业进行业务优化。
早期采用的是传统的Lambda架构,分为离线和实时两套处理流程。实时架构服务于对数据时效性要求更高的ToC业务线,离线架构侧重于存量数据修复与T+7,T+15的ToB数据交付服务等。该架构的优势在于项目开发灵活分段提测,并能快速响应业务需求的变化。但是在数据开发,运维管理等方面存在明显缺陷:
- 逻辑冗余:同一个业务方案需要开发离线和实时两套逻辑,代码复用率很低,这就增加了需求迭代成本和开发周期。此外,任务交接,项目管理以及架构运维的难度和复杂度也比价高,给开发团队带来比较大的挑战。
- 数据不一致:在该架构中,当应用层数据来源存在多条链路时,极易出现数据不一致问题。这些问题不仅增加了数据排查的时间,还对数据的准确性和可靠性带来了负面影响。
- 数据孤岛:在该架构中,数据分散存储在不同的组件中。比如:普通商查表存储在MySQL中,主要支持C端的高并发点查操作;对于像DimCount涉及宽表频繁变更的数据,选择Hbase的KV存储方式;对于单表数据量超过60亿的年度维度的点查,则借助GaussDB数据库实现。该方式虽然可以各自满足数据需求,但涉及组件较多且数据难以复用,极易造成数据孤岛,限制了数据的深度挖掘和利用。
除此之外,随着商业查询平台业务的不断扩展,新的业务需求不断涌现,例如需要支持分钟级灵活的人群包圈选与优惠券发放计算、订单分析与推送信息分析等新增数据分析需求。为了满足这些需求,该平台开始寻找一份能够集数据集成、查询、分析于一身的强大引擎,实现真正的All In One。
二、OLAP引擎调研
在选型调研阶段,该平台深入考察了Apache Doris、ClickHouse、Greenplum 这三款数据库。结合早期架构痛点和新的业务需求,新引擎需要具备以下能力:
- 标准SQL支持:可使用SQL编写函数,学习和使用成本较低;
- 多表联合查询能力:支持人群包即时交并差运算,支持灵活配置的人群包圈选;
- 实时Upsert能力:支持push推送日志数据的upsert操作,每天需要更新的数据量高达6亿条;
- 运维难度:架构简单,轻量化部署及运维。
根据调研结果,可以发现Apache Doris优势明显,满足该平台的选型目标:
- 多种Join逻辑:通过Colocation Join、Bucket Shuffle Join、Runtime Filter等Join优化手段,可在雪花模型的基础上进行高效的多表联合OLAP分析。
- 高吞吐Upsert写入:Unique Key模型采用了Merge-on-Write写时合并模式,支持实时高吞吐的Upsert写入,并可以保证Exactly-Once的写入语义;
- 支持Bitmap:Doris提供了丰富的Bitmap函数体系,可便捷的筛选出符合条件的ID交并集,可有效提高人群圈选的效率。
- 极简易用:Doris满足轻量化部署要求,仅有FE、BE两种进程,使得横向扩展变得简单,同时降低了版本升级的风险,更有利于维护。此外,Doris 完全兼容标准 SQL 语法,并在数据类型,函数等生态上提供了更全面的支持。
三、架构2.0:数据服务层All in Apache Doris
该商业查询平台基于 Apache Doris 升级了数据架构。首先使用 Apache Doris 替换了离线处理链路中的Hive,其次通过对Light Schema Change、Unique Key写时合并等特性的尝试与实践,仅使用Doris就取代了早期架构中 GaussDB 、HBase、Hive 三种数据库,打破了数据孤岛,实现了数据服务层 All in Doris。
- 引入Unique Key写时合并机制:为了满足大表在C端常态并发下的点查需求,通过设置多副本并采用Unique Key写时合并机制,确保了数据的实时性和一致性。基于该机制Doris成功替代了GaussDB,提供了更高效、更稳定的服务。
- 引入 Light Schema Change机制:该机制使得可以在秒级时间内完成DimCount表字段新增操作,提高了数据处理的效率,基于该机制Doris成功替代了Hbase,实现了更快速、更灵活的数据处理。
- 引入 PartialUpdate 机制 :通过 Aggregate 模型的 REPLACE_IF_NOT_NULL,加速两表关联的开发,这一改进使得多表级联开发更加高效。
Apache Doris 上线后,其业务覆盖范围迅速扩大,并在短期内就取得了显著的成果:
- 2023 年 3 月,Apache Doris 正式上线后,运行了两个集群十余台 BE,这两个集群分别负责数分团队商业分析与数据平台架构优化,共同支撑大规模的数据处理分析的重要任务,每天支撑数据量高达10亿条,计算指标达500+,支持人群包圈选,优惠券推送,充值订单分析及数据交付等需求。
- 2023 年 5 月,借助 Apache Doris 完成数分团队商业化分析集群ETL任务的流式覆写,近半离线定时调度任务迁移至Doris中,提高了离线计算任务的稳定性和时效性,同时绝大数实时任务也迁移至Apache Doris中,整体集群规模达到二十余台。
尽管架构2.0中实现了数据服务层的All in one,并且引入了Doris加速离线计算任务,但离线链路与实时链路仍然是割裂的状态,依旧面临处理逻辑重复,数据不一致的问题。其次,虽然引入Doris实现了大批量数据的实时更新与即时查询,但是对于时效性要求不高的离线任务,将其迁移至Doris集群可能会对在线业务的集群负载和稳定性产生一定的影响。因此,该平台计划进行下一步的升级改造。
四、架构 3.0:基于Doris Multi-Catalog的湖仓一体架构
考虑到Doris多源Catalog具备的数据湖分析能力,该平台决定在架构中引入Hudi作为数据更新层,这样可以将Doris作为数据统一查询的入口,对Hudi的Read Optimized 表进行查询,并采用流批一体的方式将数据写入Hudi,这样就实现了Lambda架构到Kapper架构的演进,完成了架构3.0的升级。
-
利用 Hudi 天然支持 CDC 的优势,在 ODS 层将 Hudi 作为 Queryable Kafka,实现贴源层数据接入。
-
使用Mysql作为Queryable State进行分层处理,最终结果首先会写入Mysql,再根据数据用途同步至Hudi或 Doris中。
-
对于存量数据的录入,通过自定义Flink Source实现全量数据的Exactly Once抽取至Hudi,同时支持谓词下推与状态恢复。
根据不同业务的重要程度,会将数据分别存储到Doris以及Hudi中,以最大程度地满足业务需求和性能需求:
在架构 3.0 中,该查询平台将较为沉重的离线计算嵌入到数据湖中,使Doris能够专注于应用层计算,既能有效保证湖和仓在架构上的融合统一,也可以充分发挥湖和仓各自的能力优势。这一架构的演进也推进了集群规模的进一步扩展,这一架构的演进也推进了集群规模的进一步扩展,截至目前 Doris 在该查询平台的机器规模已达数十台 ,数据探查,指标计算的维度超过200个,指标的总规模也超过了1200.
五、实践经验
某工商信息商业查询平台在 C 端查询业务中面临的核心挑战如下:
- 超大规模明细表的高并发查询:平台中存在超60亿的超大规模明细表,需要提供对该明细表的高并发查询能力
- 多维度深度分析:数据分析团队希望对 C 端数据进行多维度分析,深入挖掘更多隐藏维度及数据穿透关系,这需要强大的数据处理和灵活的数据分析能力,以便从大量数据中提取有价值的信息。
- 定制化实时看板:希望将某些固定模板的SQL定制为实时看板,并满足并发查询与分钟级数据新鲜度的要求。同时希望将实时数据看板嵌入到 C 端页面中,以增强 C 端功能性与便利性。
为应对C端提出的挑战,该平台利用了Doris的多个特性,实现了单点查询速度提升127%,批量/全量数据条件查询速度提升 193% 、开发效率提升 100% 的显著提升,此外面向 C 端的并发能力显著增强,目前可以轻松承载高达3000QPS的线上并发量。
5.1 引入Merge-on-Write,百亿级单表查询提速近三倍
为了解决年报相关表(数据量在60亿)在C端的高并发查询问题,同时实现降本增效的目标,该平台启用了Doris的Unique Key 模型的Merge-on-Write 写时合并功能。
Merge-on-Write 写时合并是 Apache Doris 在 1.2.0 版本中引入的新特性,将 Unique Key 表的数据按主键去重工作才查询阶段转移到了写入阶段,因此在查询时可以获得与Duplicate Key 表相当的性能。
具体来说,通过写时合并可以避免不必要的多路归并排序,始终保证有效的主键只出现在一个文件中(即在写入的时候保证了主键的唯一性),不需要在读取的时候通过归并排序来对主键进行去重,大大降低了CPU的计算资源消耗。同时也支持谓词下推,利用Doris 丰富的索引在数据 IO层面就能够进行充分的数据裁剪,大大减少数据的读取量和计算量,因此在很多场景的查询中都有比较明显的性能提升。
由于增加了写入流程去重的代价,写时合并的导入速度会受到一定影响,为尽可能的减少写时合并对导入性能的影响,Doris使用了以下技术对数据去重的性能进行优化,因此在实时更新场景,去重代价可以做到用户感知不明显。
- 每个文件都生成一个主键索引,用于快速定位重复数据出现的位置。
-
每个文件都会维护一个 min/max key 区间,并生成一个区间树。查询重复数据时能够快速确定给定 key 可能存在于哪个文件中,降低查找成本。
-
每个文件都维护一个 BloomFilter,当 Bloom Filter 命中时才会查询主键索引
-
通过多版本的 DeleteBitmap,来标记该文件被删除的行号
Unique Key 写时合并的使用比较简单, 只需要在表的Properties中开启即可。在此以 tableA表为例,单表数据量约 3 亿行、单行数据约 0.8KB,单表全量数据写入耗时约 5 分钟。在开启 Merge-on-Write 写时合并后,执行查询的耗时从之前的 0.45 秒降低至 0.22 s,对批量或全量数据进行条件查询时耗时从 6.5 秒降低至 2.3 秒,平均性能提升接近 3 倍。
通过这种技术手段,实现了高性能的单点查询,大大提高了查询效率和响应速度,同时降低了查询成本。这一优化措施不仅满足了用户对数据查询的高要求,还为平台的稳定性和可持续性发展提供了有力保障。
5.2 部分列数据更新,数据开发效率提升100%
在该商业查询平台的业务场景中,有一张包含企业各维度的大宽表,而平台要求任意维度的数据变更都反映到落地表。在之前开发中,需要为每个维度开发一套类似Lookup Join的逻辑,以确保每个维度的变更都可以及时更新。
但是这种做法也带来一些问题,比如每新加入一个维度时,其他维度的逻辑也需要进行调整,这增加了开发和维护的复杂性和工作量。其次,为了保持上线的灵活性,该平台并没有将所有维度合并为一张表,而是将3-5个维度拆分为一张独立的表,这种拆分方式也导致后续使用变得极为不方便。
@RequiredArgsConstructor
private static class Sink implements ForeachPartitionFunction<Row> {
private final static Set<String> DIMENSION_KEYS = new HashSet<String>() {{
add("...");
}};
private final Config config;
@Override
public void call(Iterator<Row> rowIterator) {
ConfigUtils.setConfig(config);
DorisTemplate dorisTemplate = new DorisTemplate("dorisSink");
dorisTemplate.enableCache();
// config `delete_on` and `seq_col` if is unique
DorisSchema dorisSchema = DorisSchema.builder()
.database("ads")
.tableName("ads_user_tag_commercial").build();
while (rowIterator.hasNext()) {
String json = rowIterator.next().json();
Map<String, Object> columnMap = JsonUtils.parseJsonObj(json);
// filter needed dimension columns
Map<String, Object> sinkMap = columnMap.entrySet().stream()
.filter(entry -> DIMENSION_KEYS.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
dorisTemplate.update(new DorisOneRow(dorisSchema, sinkMap));
}
dorisTemplate.flush();
}
}
为解决此类问题,该平台采用了自定义的DorisTemplate(内部封装 Stream Load),以实现对存量数据的处理。其核心思想是参考了kafka Producer的实现方式,适用map来缓存数据,并设立专门的sender线程,根据时间间隔,数据条数或数据大小定期发送数据。
通过从源端过滤出所需的列,将其写入Doris的企业信息维表中。同时,针对两表Join场景,选择用Agg模型的replace_if_not_null进行优化,使得部分列的更新工作变得更加高效。
这种改进为开发工作带来了100%的效率提升,以单表三维度举例,以前需要1天的时间开发,而现在仅仅需要0.5天,这一改变提升了开发效率,使其能够迅速的处理数据并满足业务需求。
值得一提的是,Apache Doris 在 2.0.0 版本中实现了 Unique Key 主键模型的部分列更新,在多张上游源表同时写入一张宽表时,无需由Flink进行多流Join打宽,直接写入宽表即可,减少了计算资源的消耗并大幅降低了数据处理链路的复杂性。
5.3 丰富Join的优化手段,整体查询速度最高提升近四倍
在该平台的业务场景中,超过90%的表都包含实体ID这一字段,因此对该字段创建了Colocation Group,使查询时执行计划可以命中Colocation Join,从而避免了数据shuffle带来的计算开销。与普通的Shuffle Join相比,执行速度提升了253%,极大地提高了查询效率。
对于一级维度下的某些二级维度,由于只存储了一级维度的主键ID而没有实体ID字段,如果使用传统的Shuffle Join进行查询,那么A表与B表都需要参与Shuffle操作。为了解决这个问题,该平台对查询语法进行了优化,使查询能够命中Bucket Shuffle Join,从而降低了50%以上的shuffle量,整体查询速度提升至少77%。
5.4 Light Schema Change,线上 QPS 高达3000+
为了提升C端并发能力,该平台为每个实体的每个维度都维护了一个count值。后端同学在查询数据前,会先查询该count值,只有在count值大于0的情况下,才会继续获取明细数据。为了适应维度不断扩张的迭代需求,选择采用了一套SSD存储的Hbase集群,利用其KV存储特性维护了这套count值。
而Doris Light Schema Change在面对5亿的数据量时也可以实现秒级字段增加,因此该平台在架构3.0中将DimCount程序从写Hbase迁移到了写Doris。在多副本与写时合并的功能的联合助力下,可以轻松承载高达3000的线上并发量。
当引入新的需要计算的维度时,处理流程如下:
- 将其KafkaTopic,查询SQL等信息录入Apollo配置平台
- Flink程序通过Apollo 的 Listener 检测到有新的指标,请求 Redis 分布式锁
-
查询该维度的 success_key
,
判断是否完成过初始化 -
通过
alter table
语句完成初始化,并设置 success_key -
其他 subtask 顺次执行 2-4 步骤
-
程序继续执行,新的 count 值已经可以写入
5.5 优化实时Join场景,开发成本降低 70%
实时宽表Join的痛点在于多表外键关联,比如select * from A join B on A.b_id=B.id join C on B.c_id = C.id 在实时模型构建时,A,B,C三表都有可能各自发生实时变更,若要使得结果表对每个表的变化都能进行实时响应,在 Flink 框架下有 2 种实现方式:
- A、B、C三张表,每张表都开发一套关联另外两张表额 Lookup Join逻辑。
- 设置Flink中State存储的TTL为更新时间,例如3天,这样可以保证在3天内的数据变化能够被实时感知和处理,同时,通过每日离线计算,可以保证3天前的更新能够在T+1的时间内被处理和反映在数据中。
而以上并不是最优方式,还存在一些问题:
- 随着宽表所需的子表数量不断增长,额外的开发成本和维护负担也随之线性上升。
- TTL时间的设定是一把双刃剑,设定过长会增加Flink引擎状态存储的额外开销,而设定过短则可能导致更多的数据只能享受T+1的数据新鲜度。
在之前的业务场景中,我们考虑将方案一与方案二进行结合,并进行了适当的折中。具体来说, 只开发A join B join C的逻辑,并将产出的数据首先存储在Mysql中。每当B ,C表出现数据变更时,通过JDBC查询结果表来获取所有发生变化的A表ID,并据此重新进行计算。
然而在引入Doris之后,通过写时合并,谓词下推,命中索引以及高性能的Join策略等技术,为该平台提供了一种查询时的现场关联方式,这不仅降低了开发的复杂度,还在三表关联的场景下,由原先需要的3人天的工作量降低为1人天,开发效率得到极大提升。
六、优化经验
在生产实践的过程中,该平台也遇到了一些问题、包括文件版本产生过多、事务挤压、FE 假死等问题报错。然而,通过参数调整和方案调试,最终解决了这些问题,以下是优化经验总结。
6.1 E-235(文件版本过多)
在凌晨调度Broker Load时,由于调度系统任务挤占可能会导致同时并发多个任务,使得BE流量剧增,造成IO抖动,产生文件版本过多的问题(E-235)。因此对此进行了以下优化,通过这些改动E-235问题未再发生:
- 使用Stream Load替代 Broker Load,将BE流量分摊到全天。
- 自定义写入器包装Stream Load, 实现异步缓存,限流削峰等效果,充分保证数据写入的稳定性。
-
优化系统配置,调整 Compaction 和写入 Tablet 版本的相关参数:
- max_base_compaction_threads : 4->8
- max_cumu_compaction_threads : 10->16
- compaction_task_num_per_disk : 2->4->8
- max_tablet_version_num : 500->1024
6.2 E-233(事务挤压)
在深入使用Doris的过程中,发现在多个 Stream Load 同时写入数据库时,如果上游进行多表数据清洗并且限速难以把控时,可能会出现QPS较大的问题,进而触发 E-233 报错。为了解决这个问题,该平台进行了以下调整,调整之后在面对实时写入 300+ 表时, 再未复现 E-233 问题:
将 DB 中的表进行更细致的分库,以实现每个 DB 的事务分摊
参数调整:max_running_txn_num_per_db : 100->1000->2048
6.3 FE 假死
通过Grafana监控发现,FE经常出现宕机现象,主要原因是因为早期该平台采用FE和BE混合部署的方式,当BE进行网络IO传输的时,可能会挤占机器FE的IO与内存。其次,因运维团队的 Prometheus 对接的服务比较多,其稳定性和健壮性不足,从而造成假象告警。为了解决这些问题,做了以下调整:
- 当前BE机器的内存是128G, 使用 32G 的机器将 FE 节点迁出。
- Stream Load的过程中,Doris会选定一个BE节点作为Coordinator 节点,用于接受数据并分发给其他BE节点,并将最终导入结果返回给用户。用户可通过HTTP 协议提交导入命令至FE或直接指定BE节点。该平台采用直接指定BE的方式,实现负载均衡,减少FE在Stream Load 中的参与,以降低 FE 压力。
- 定时调度 show processlist 命令进行探活, 同时及时 Kill 超时 SQL 或者超时连接。
七、未来规划
截止目前,基于 Doris 的数据平台已经满足该商业查询平台在实时与离线的统一写入与查询,支持了 BI 分析、离线计算、C 端高并发等多个业务场景,为产品营销、客户运营、数据分析等场景提供数据洞察能力与价值挖掘能力。未来,该商业查询平台还计划进行以下升级与优化:
- 版本升级:升级 Apache Doris 2.0 版本,更进一步实现高并发点查和部分列更新等最新特性,进一步优化现有架构,为查询提效。
- 规模扩大:进一步扩大集群规模,并将更多的分析计算迁移至Doris中。
- 日志分析:随着节点数越来越多,日志数据也在不断产生,未来该平台计划将集群日志接入到 Doris 中统一收集管理和检索,便于问题的提示探查,因此倒排索引和日志分析也是后面重要的拓展场景。
- 自动化运维:在某些特定查询场景下,可能会导致集群 BE 节点宕机,虽然出现概率较低,但手动启动仍然比较满发,后续将引入自动重启能力,使节点能够快速恢复并重新投入运行。
- 提升数据质量:目前该平台大部分时间专注于业务的实现上,数据入口的统一收束和补齐,数据质量的监控还存在短板,所以希望可以在这方面提升数据质量。
参考文章:
Apache Doris 在某工商信息商业查询平台的湖仓一体建设实践