火山引擎DataLeap基于Apache Atlas自研异步消息处理框架

news2025/1/13 13:42:24

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

 

字节数据中台DataLeap的Data Catalog系统通过接收MQ中的近实时消息来同步部分元数据。Apache Atlas对于实时消息的消费处理不满足性能要求,内部使用Flink任务的处理方案在ToB场景中也存在诸多限制,所以团队自研了轻量级异步消息处理框架,支持了字节内部和火山引擎上同步元数据的诉求。本文定义了需求场景,并详细介绍框架的设计与实现。

背景

字节数据中台DataLeap的Data Catalog系统基于Apache Atlas搭建,其中Atlas通过Kafka获取外部系统的元数据变更消息。在开源版本中,每台服务器支持的Kafka Consumer数量有限,在每日百万级消息体量下,经常有长延时等问题,影响用户体验。在2020年底,火山引擎DataLeap研发人员针对Atlas的消息消费部分做了重构,将消息的消费和处理从后端服务中剥离出来,并编写了Flink任务承担这部分工作,比较好的解决了扩展性和性能问题。然而,到2021年年中,团队开始重点投入私有化部署和火山公有云支持,对于Flink集群的依赖引入了可维护性的痛点。在仔细的分析了使用场景和需求,并调研了现成的解决方案后,火山引擎DataLeap研发人员决定投入人力自研一个消息处理框架。当前这个框架很好的支持了字节内部以及ToB场景中Data Catalog对于消息消费和处理的场景。本文会详细介绍框架解决的问题,整体的设计,以及实现中的关键决定。

需求定义

使用下面的表格将具体场景定义清楚。

需求维度

需求描述

吞吐量

每日百万级别,每秒峰值>100

服务质量(QoS)

至少一次

延迟消息

支持将消息标记为延迟处理,最高延迟1 min

重试

自动对处理失败消息重试,重试次数可定义

并行与顺序处理

Partition内部支持按照某个Key重新分组,不同Key之间接受并行,同一个Key要求顺序处理

消息处理时间

不同类型的消息,处理时间会有较大差别,从<1s~1min

封装

确保不丢消息的前提下,依赖框架做Offset的提交,业务侧只需要编写消息的处理逻辑;另外,将系统状态以Metric方式暴露

轻量

支持与后端服务混合部署,不引入额外的维护成本

相关工作

在启动自研之前,火山引擎DataLeap研发团队评估了两个比较相关的方案,分别是Flink和Kafka Streaming。Flink是团队之前生产上使用的方案,在能力上是符合要求的,最主要的问题是长期的可维护性。在公有云场景,那个阶段Flink服务在火山云上还没有发布,内部自己的服务又有严格的时间线,所以必须考虑替代;在私有化场景,火山引擎DataLeap研发团队不确认客户的环境一定有Flink集群,即使部署的数据底座中带有Flink,后续的维护也是个头疼的问题。另外一个角度,作为通用流式处理框架,Flink的大部分功能其实团队并没有用到,对于单条消息的流转路径,其实只是简单的读取和处理,使用Flink有些“杀鸡用牛刀”了。另外一个比较标准的方案是Kafka Streaming。作为Kafka官方提供的框架,对于流式处理的语义有较好的支持,也满足团队对于轻量的诉求。最终没有采用的主要考虑点是两个:

  • 对于Offset的维护不够灵活:内部的场景不能使用自动提交(会丢消息),而对于同一个Partition中的数据又要求一定程度的并行处理,使用Kafka Streaming的原生接口较难支持。

  • 与Kafka强绑定:大部分场景下,团队不是元数据消息队列的拥有者,也有团队使用RocketMQ等提供元数据变更,在应用层,团队希望使用同一套框架兼容。

设计

概念说明

  • MQ Type:Message Queue的类型,比如Kafka与RocketMQ。后续内容以Kafka为主,设计一定程度兼容其他MQ。

  • Topic:一批消息的集合,包含多个Partition,可以被多个Consumer Group消费。

  • Consumer Group:一组Consumer,同一Group内的Consumer数据不会重复消费。

  • Consumer:消费消息的最小单位,属于某个Consumer Group。

  • Partition:Topic中的一部分数据,同一Partition内消息有序。同一Consumer Group内,一个Partition只会被其中一个Consumer消费。

  • Event:由Topic中的消息转换而来,部分属性如下。

    • Event Type:消息的类型定义,会与Processor有对应关系;

    • Event Key:包含消息Topic、Partition、Offset等元数据,用来对消息进行Hash操作;

  • Processor:消息处理的单元,针对某个Event Type定制的业务逻辑。

  • Task:消费消息并处理的一条Pipeline,Task之间资源是相互独立的。

框架架构

 

整个框架主要由MQ Consumer, Message Processor和State Manager组成。

  • MQ Consumer:负责从Kafka Topic拉取消息,并根据Event Key将消息投放到内部队列,如果消息需要延时消费,会被投放到对应的延时队列;该模块还负责定时查询State Manager中记录的消息状态,并根据返回提交消息Offset;上报与消息消费相关的Metric。

  • Message Processor:负责从队列中拉取消息并异步进行处理,它会将消息的处理结果更新给State Manager,同时上报与消息处理相关的Metric。

  • State Manager:负责维护每个Kafka Partition的消息状态,并暴露当前应提交的Offset信息给MQ Consumer。

下一篇将分享此异步消息框架的实现过程以及线上运维case举例。

实现

线程模型

 

每个Task可以运行在一台或多台实例,建议部署到多台机器,以获得更好的性能和容错能力。每台实例中,存在两组线程池:

  • Consumer Pool:负责管理MQ Consumer Thread的生命周期,当服务启动时,根据配置拉起一定规模的线程,并在服务关闭时确保每个Thread安全退出或者超时停止。整体有效Thread的上限与Topic的Partition的总数有关。

  • Processor Pool:负责管理Message Processor Thread的生命周期,当服务启动时,根据配置拉起一定规模的线程,并在服务关闭时确保每个Thread安全退出或者超时停止。可以根据Event Type所需要处理的并行度来灵活配置。

两类Thread的性质分别如下:

  • Consumer Thread:每个MQ Consumer会封装一个Kafka Consumer,可以消费0个或者多个Partition。根据Kafka的机制,当MQ Consumer Thread的个数超过Partition的个数时,当前Thread不会有实际流量。

  • Processor Thread:唯一对应一个内部的队列,并以FIFO的方式消费和处理其中的消息。

StateManager

 

在State Manager中,会为每个Partition维护一个优先队列(最小堆),队列中的信息是Offset,两个优先队列的职责如下:

  • 处理中的队列:一条消息转化为Event后,MQ Consumer会调用StateManager接口,将消息Offset 插入该队列。

  • 处理完的队列:一条消息处理结束或最终失败,Message Processor会调用StateManager接口,将消息Offset插入该队列。

MQ Consumer会周期性的检查当前可以Commit的Offset,情况枚举如下:

  • 处理中的队列堆顶 < 处理完的队列堆顶或者处理完的队列为空:代表当前消费回来的消息还在处理过程中,本轮不做Offset提交。

  • 处理中的队列堆顶 = 处理完的队列堆顶:表示当前消息已经处理完,两边同时出队,并记录当前堆顶为可提交的Offset,重复检查过程。

  • 处理中的队列堆顶 > 处理完的队列堆顶:异常情况,通常是数据回放到某些中间状态,将处理完的队列堆顶出堆。

注意:当发生Consumer的Rebalance时,需要将对应Partition的队列清空

KeyBy与Delay Processing的支持

因源头的Topic和消息格式有可能不可控制,所以MQ Consumer的职责之一是将消息统一封装为Event。

根据需求,会从原始消息中拼装出Event Key,对Key取Hash后,相同结果的Event会进入同一个队列,可以保证分区内的此类事件处理顺序的稳定,同时将消息的消费与处理解耦,支持增大内部队列数量来增加吞吐。

Event中也支持设置是否延迟处理属性,可以根据Event Time延迟固定时间后处理,需要被延迟处理的事件会被发送到有界延迟队列中,有界延迟队列的实现继承了DelayQueue,限制DelayQueue长度, 达到限定值入队会被阻塞。

异常处理

Processor在消息处理过程中,可能遇到各种异常情况,设计框架的动机之一就是为业务逻辑的编写者屏蔽掉这种复杂度。Processor相关框架的逻辑会与State Manager协作,处理异常并充分暴露状态。比较典型的异常情况以及处理策略如下:

  • 处理消息失败:自动触发重试,重试到用户设置的最大次数或默认值后会将消息失败状态通知State Manager。

  • 处理消息超时:超时对于吞吐影响较大,且通常重试的效果不明显,因此当前策略是不会对消息重试,直接通知State Manager 消息处理失败。

  • 处理消息较慢:上游Topic存在Lag,Message Consumer消费速率大于Message Processor处理速率时,消息会堆积在队列中,达到队列最大长度,Message Consumer 会被阻塞在入队操作,停止拉取消息,类似Flink框架中的背压。

监控

为了方便运维,在框架层面暴露了一组监控指标,并支持用户自定义Metrics。其中默认支持的Metrics如下表所示:

监控类别

监控指标

Message Consumer

Consumer Lag

Rebalance rate

Deserialize QPS

Consumer heartbeat

Message Enqueue Time

Message Processor

Process QPS

Process time

Internal Queue

Queue length

线上运维case举例

实际生产环境运行时,偶尔需要做些运维操作,其中最常见的是消息堆积和消息重放。

对于Conusmer Lag这类问题的处理步骤大致如下:

  • 查看Enqueue Time,Queue Length的监控确定服务内队列是否有堆积。

  • 如果队列有堆积,查看Process Time指标,确定是否是某个Processor处理慢,如果是,根据指标中的Tag 确定事件类型等属性特征,判断业务逻辑或者Key设置是否合理;全部Processor 处理慢,可以通过增加Processor并行度来解决。

  • 如果队列无堆积,排除网络问题后,可以考虑增加Consumer并行度至Topic Partition 上限。

消息重放被触发的原因通常有两种,要么是业务上需要重放部分数据做补全,要么是遇到了事故需要修复数据。为了应对这种需求,我们在框架层面支持了根据时间戳重置Offset的能力。具体操作时的步骤如下:

  • 使用服务测暴露的API,启动一台实例使用新的Consumer GroupId: {newConsumerGroup} 从某个startupTimestamp开始消费

  • 更改全部配置中的 Consumer GroupId 为 {newConsumerGroup}

  • 分批重启所有实例

总结

为了解决字节数据中台DataLeap中Data Catalog系统消费近实时元数据变更的业务场景,团队自研了轻量级消息处理框架。当前该框架已在字节内部生产环境稳定运行超过1年,并支持了火山引擎上的数据地图服务的元数据同步场景,满足了团队的需求。

下一步会根据优先级排期支持RocketMQ等其他消息队列,并持续优化配置动态更新,监控报警,运维自动化等方面。

点击跳转大数据研发治理套件 DataLeap了解更多

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/927204.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql索引、事务与存储引擎 (索引)

一、索引 1、索引的概念&#xff1a; 索引就是一种帮助系统能够更加快速的查找信息的数据结构。 2.索引的作用&#xff1a; ①数据库利用各种快速定位技术&#xff0c;能够大大加快查询速度&#xff0c;这是创建索引的最主要的原因。 ②当表很大或查询涉及到多个表时&#xff0…

Linux安装软件每次靠百度,这次花了些时间,终于算是搞明白了

Linux下安装命令虽然经常使用&#xff0c;但也仅仅是会使用&#xff0c;每次再用时依然的百度 。于是就花了些时间整体的梳理了一番&#xff0c;以便于更好的理解。 1.安装流程介绍 在Linux下安装软件&#xff0c;其实也是遵循着和Windows一样的安装流程。 首先&#xff0c;…

巨人互动|Facebook海外户Facebook游戏全球发布实用策略

Facebook是全球最大的社交媒体平台之一&#xff0c;拥有庞大的用户基数和广阔的市场。对于游戏开发商而言&#xff0c;利用Facebook进行全球发布是一项重要的策略。下面小编将介绍一些实用的策略帮助开发商在Facebook上进行游戏全球发布。 巨人互动|Facebook海外户&Faceboo…

淘宝API技术解析,实现按图搜索淘宝商品

淘宝提供了开放平台接口&#xff08;API&#xff09;来实现按图搜索淘宝商品的功能。您可以通过以下步骤来实现&#xff1a; 1. 获取开放平台的访问权限&#xff1a;首先&#xff0c;您需要在淘宝开放平台创建一个应用&#xff0c;获取访问淘宝API的权限。具体的申请步骤和要求…

1.6 服务器处理客户端请求

客户端进程向服务器进程发送一段文本&#xff08;MySQL语句&#xff09;&#xff0c;服务器进程处理后再向客户端进程发送一段文本&#xff08;处理结果&#xff09;。 从图中我们可以看出&#xff0c;服务器程序处理来自客户端的查询请求大致需要经过三个部分&#xff0c;分别…

前端需要理解的 React 知识

1 框架通识 1.1 MVVM、MVC和MVP MVC、MVP 和 MVVM 是三种常见的软件架构设计模式。主要通过分离关注点的方式来组织代码结构&#xff0c;优化开发效率。 MVC将应用抽象为数据层&#xff08;Model&#xff09;、视图层&#xff08;View&#xff09;、逻辑层&#xff08;contr…

解锁Selenium的力量:不仅仅是Web测试

Selenium简介 Selenium&#xff0c;作为Web应用测试的领军者&#xff0c;已经成为了无数开发者和测试人员的首选工具。它不仅仅是一个自动化测试工具&#xff0c;更是一个强大的Web应用交互框架。 Selenium的起源与发展 Selenium的历史可以追溯到2004年&#xff0c;由Jason Hu…

二叉树、红黑树、B树、B+树

二叉树 一棵二叉树是结点的一个有限集合&#xff0c;该集合或者为空&#xff0c;或者是由一个根节点加上两棵别称为左子树和右子树的二叉树组成。 二叉树的特点&#xff1a; 每个结点最多有两棵子树&#xff0c;即二叉树不存在度大于2的结点。二叉树的子树有左右之分&#xf…

【算法刷题之哈希表(2)】

目录 1.leetcode-454. 四数相加 II2.leetcode-383. 赎金信&#xff08;1&#xff09;暴力解法&#xff08;2&#xff09;哈希法 3.leetcode-205. 同构字符串&#xff08;1&#xff09;哈希法&#xff08;2&#xff09;直接对比查找 4.leetcode-128. 最长连续序列5.总结 1.leetc…

抖音小程序商城开发制作源码 含多套模板+部署搭建教程

分享一个抖音小程序商城的制作源码&#xff0c;含多套模板、模块化自由DIY功能和完整的搭建部署教程。程序支持除抖音小程序商城制作外&#xff0c;还支持一键同步微信、支付宝、百度、今日头条端小程序。 抖音小程序商城的基本架构包括前端页面和后端管理平台两部分。前端页面…

【FPGA】FPGA入门 —— 基本开发流程

FPGA入门 1. FPGA入门2. FPGA开发流程3. 二选一多路器 - 快速熟悉开发环境及流程 1. FPGA入门 快速上手verilog语法状态机&#xff0c;线性序列机FPGA常见的设计方法自己写代码&#xff0c;下载代码进行使用&#xff0c;使用厂家/第三方提供的IP核常见接口设计 等等。。 学习…

白介素对NK细胞功能的影响(IL-1β、IL-12、IL-15、IL-18、IL-21)

1、促进NK细胞扩增和活化&#xff1a;IL-2/21 Soiffer RJ等自1996年起即报道IL-2低剂量持续输注和间歇给药对转移癌患者的CD56NK细胞有明显扩增效果。大部分NK细胞表面具有IL-2中亲和性受体&#xff0c;IL-2诱导NK的杀伤活性约需18&#xff5e;24小时。此外&#xff0c;IL-2还…

Docker安装Jenkins实操记录

前置条件&#xff1a; 1、安装了docker 2、安装了java&#xff08;没有安装情况下&#xff0c;可运行&#xff1a;yum install -y java-1.8.0-openjdk-devel.x86_64&#xff09; 一、拉取镜像 1、docker pull jenkins/jenkins 2、mkdir -p /usr/local/jenkins 3、chmod 777 …

模型崩溃,ChatGPT变“笨”了?最新评估结果揭示真相原因

​ChatGPT性能是否变化&#xff1f; 人们可能会好奇&#xff0c;像ChatGPT这样的AI系统是否会因为太聪明而最终无法被人类所驾驭使用。但是&#xff0c;最近的一项研究表明&#xff0c;ChatGPT正在变得越来越糟糕。[1] OpenAI的ChatGPT帮助了无数人更高效地使用互联网。无论是…

php 多维数组排序,根据某一列排序(array_multisort()和array_column()联用)

array_multisort()和array_column()联用效果直接叠满,11>100 先来看下两个函数的介绍和用法 array_column(): 一般模式,不需要其中字段作为id,只需要提取val值 <?php // 可能从数据库中返回数组 $a [[id > 5698, first_name > Peter, last_name > G…

【AndroidStudio】屏蔽小米打印

使用小米手机调试时&#xff0c;会一直有notifyQueue load error的打印 在过滤器重添加过滤条件即可 -message:notifyQueue

ISO-16750-1,2,3,4,5_2023 道路车辆 — 电气和电子设备的环境条件和测试 ,标准汇总

目录 一、ISO 16750标准各Part部分当前状态&#xff1a; ISO 16750-2023 合集1-5包下载&#xff1a;https://download.csdn.net/download/std7879/88251235 二、ISO 16750标准各Part部分描述的内容&#xff1a; ISO 16750-1:2023Part 1: General概述 ISO 16750-2:2023 Part…

8路模拟信号采集FMC子卡模块推荐哪些?

FMC168是一款基于VITA57.4标准的2GSPS/2.6GSPS/3GSPS采样率14位分辨率Double FMC子卡模块&#xff0c;该模块可以实现8路14-bit、2GSPS/2.6GSPS/3GSPS采样率模拟信号采集。该板卡ADC器件采用ADI公司的AD9208芯片,该芯片与AD9689完全兼容&#xff0c;可以实现不同的采样率范围。…

微信小程序客服系统-两种形式:嵌入页面传递更多信息 与 自带组件形式

微信小程序对接有两种方式&#xff1a;webview组件嵌入页面&#xff0c;小程序客服组件对接消息 使用webview组件嵌入聊天页面形式。这种形式更加的灵活可控&#xff0c;可以传递更多的信息给到客服&#xff0c;例如可以把用户的手机号&#xff0c;所在页面的产品信息等带入进来…

Dubbo3之SerializingExecutor

前言 Dubbo3 提供了一个挺有意思的 Executor&#xff0c;用来将提交到线程池里的任务按顺序串行执行。 需求背景&#xff1a;你有一个线程池&#xff0c;但是你不想修改它&#xff0c;现在你的需求是要把提交上去的任务按顺序串行执行。 在这样一个需求背景下&#xff0c;Ser…