一年省七位数,得物自建HFDS在 Flink Checkpoint 场景下的应用实践

news2024/11/18 3:20:24

1 背景

随着Flink实例的迁移下云以及新增需求接入,自建Flink平台规模逐渐壮大,当前总计已超4万核运行在自建的K8S集群中,然而 Flink 任务数的增加,特别是大状态任务,每次Checkpoint 时会产生脉冲式带宽占用,峰值流量超过100Gb/s,早期使用OSS作为Checkpoint数据存储,单个Bucket 每 1P数据量只有免费带宽10Gb/s,超出部分单独计费,当前规模每月需要增加1x w+/月。

为了控制这部分成本,得物开展了自建HDFS在Flink Checkpoint场景下的落地工作,实现年度成本节省xxx万元。

此次分享自建HDFS在实时计算checkpoint场景的实践经验,希望能为读者提供一些参考。

2 Flink Checkpoint 介绍

2.1 Flink里的Checkpoint是什么?

Checkpoint:简单的说,在某一时刻,将 Flink 任务本地机器中存储在状态后端的状态去同步到远程文件存储系统(比如 HDFS)的过程就叫 Checkpoint。

状态后端:做状态数据持久化的工具就叫做状态后端。比如你在 Flink 中见到的 RocksDB、FileSystem 的概念就是指状态后端,再引申一下,也可以理解为:应用中有一份状态数据,把这份状态数据存储到 MySQL 中,这个 MySQL 就能叫做状态后端。

2.2 Checkpoint解决什么问题?

其实在实时计算中的状态的功能主要体现在任务可以做到失败重启后没有数据质量、时效问题。

实时任务一般都是 7x24 小时 Long run 的,挂了之后,就会有以下两个问题,首先给一个实际场景:一个消费上游 Kafka,使用 Set 去重计算 DAU 的实时任务。

数据质量问题:当这个实时任务挂了之后恢复,Set空了,这时候任务再继续从上次失败的 Offset 消费 Kafka 产出数据,则产出的数据就是错误数据了

数据时效问题:一个实时任务,产出的指标是有时效性(主要是时延)要求的。你可以从今天 0 点开始重新消费,但是你回溯数据也是需要时间的。举例:中午 12 点挂了,实时任务重新回溯 12 个小时的数据能在 1 分钟之内完成嘛?大多数场景下是不能的!一般都要回溯几个小时,这就是实时场景中的数据时效问题。

而 Flink的Checkpoint就是把 Set 定期的存储到远程 HDFS 上,当任务挂了,我们的任务还可以从 HDFS 上面把这个数据给读回来,接着从最新的一个 Kafka Offset 继续计算就可以,这样即没有数据质量问题,也没有数据时效性问题。

2.3 Checkpoint的运行流程?

  1. JM 定时调度 Checkpoint 的触发,接受到 JM 做 Checkpoint 的请求后,开始做本地 Checkpoint,暂停处理新流入的数据,将新数据缓存起来。

  2. 将任务的本地状态数据,复制到一个远程的持久化存储(HDFS)空间上。

  3. 继续处理新流入的数据,包括刚才缓存起来的数据。

3 自建HDFS引入

3.1 为什么用HDFS?

Flink 做为一个成熟的流计算引擎,对外宣称可以实现 Exactly Once。为了实现业务上的 Exactly Once,Flink 肯定不能丢数据,也就是状态数据必须保障高可靠性,而HDFS作为是一个分布式文件系统,具备高容错率、高吞吐量等特性,是业界使用最广泛的开源分布式文件系统,针对大状态的Checkpoint任务非常契合,带宽易扩展且成本低廉。

HDFS主要有如下几项特点:

  • 和本地文件系统一样的目录树视图

  • Append Only 的写入(不支持随机写)

  • 顺序和随机读

  • 超大数据规模

  • 易扩展,容错率高

3.2 得物自建HDFS架构

架构层面是典型的主从结构,架构见下图,核心思想是将文件按照固定大小进行分片存储,

  • 主节点:称为 NameNode,主要存放诸如目录树、文件分片信息、分片存放位置等元数据信息

  • 从节点:称为 DataNode,主要用来存分片数据

比如用户发出了一个1GB的文件写请求给HDFS客户端,HDFS客户端会根据配置(默认是128MB),对这个文件进行切分,HDFS客户端会切分成8个Block,然后询问NameNode应该将这些切分好的Block往哪几台DataNode上写,此后client端和NameNode分配的多个DataNode构成pipeline管道,开始以packet为单位向Datanode写数据。

4 自建HDFS落地实践

4.1 集群规划

早期使用OSS的主要瓶颈在于带宽,为了匹配将大状态的任务从OSS迁移到Hdfs带宽需求,支撑写入流量100Gib+/s,对比OSS的带宽成本,结合到成本与带宽瓶颈考虑,内部大数据d2s.5xlarge机型做了一次性能压测,单节点吞吐能达到12Gib/s,按100Gib/s预估,算上Buffer,3副本集群需要xx台机器,满足现在的带宽及写入吞吐需求,最终选择d2s.5xlarge类型Ecs机器,对应实例详情如下:

4.2 稳定性保障建设

4.2.1 Hdfs组件指标采集

为了确保HDFS集群的稳定和可靠性,支撑线上实时Flink任务Checkpoint,监控告警建设是必不可少的,我们通过统一的采集程序Hadoop Exporter将集群里各组件的JMX信息换为维度模型,将下述为扁平化的事实指标Jmx数据,转换为维度结构,比如针对NameNode、DataNode,可以直接将指标使用预定义维度,例如:cluster、instance等维度,并存储到Prometheus能够识别的指标数据,存储为一个二维字典结构,例如: _hadoop_namenode_metrics[指标分类(通常是MBean的名称)][指标名称]

4.2.2 指标采集架构

结合当前集群的规模,我们通过集中是Pull的方式采集架构,只需要启动时指定集群Namenode及Jn的Jmx的url信息,就能采集集群的所有组件的指标信息,这样当有集群扩展或变更时,会自动采集上报到apm里,方便运维,具体采集架构如下图:

4.2.3 监控与告警

监控:基于已采集汇报上的指标数据,目前配置了Namenode、Datanode组件核心指标监控大盘,包括HDFS节点健康状态、HDFS服务健康状态、数据块健康状态、节点的写入吞吐量等指标。

告警:当前监控数据已完成接入公司天眼监控平台,我们将影响hdfs服务可用性的指标统一配置了告警模版,比如集群总的写入带宽、Callqueue队列、DN存活数量、集群节点基础io值班等,可以动态覆盖多集群,实现定制化告警,更加灵活及方便感知问题,减少故障止损时长,满足线上HDFS稳定性保障SLA目标。

4.2.4 集群快速变更能力

随着Hdfs集群规模的增加,在日常运维过程中,如何做到快速扩、缩容、节点重启及配置变更能力,保障集群具备快速止损的能力,我们封装了一整套HDFS的各组件变更能力,包括节点自动上报到cmdb对应应用、集群数据节点maintenance模式快速无影响重启、日常变配等,并集成到ansible playbook,做到集群扩容在分钟级完成。

4.3 迁移到HDFS攻克难关

4.3.1 DN 心跳汇报于删除共用一把写锁问题

现象:自建Flink平台大部分大状态任务迁移后,自建HDFS集群节点整体的水位各个ecs的网络带宽峰值,出现偶发部分任务因checkpiont 写入失败问题,报错信息如下:

问题定位过程:

  1. 根据客户端日志的堆栈信息,查看Namenode的日志找到对应的文件、块,发现了错误日志,文件块在写入成功后不能及时上报,块的状态一直处于not COMPLETE。

这里介绍下Hdfs文件写入流程介绍:

  • 客户端向datanode写入块结束后,datanode通过IBR(增量块汇报)向namenode汇报新写入的块

  • namenode收到汇报后更新文件的块副本数,当文件块副本数>=1时,文件写入状态为COMPLETE

  • 客户端写入结束后不断向namenode询问文件写入状态是否COMPLETE,失败5(默认)次后报错写入失败。

  1. 根据上述写入流程,怀疑问题出现在IBR阶段,查看Namenode监控指标,Namenode处理块汇报平均时长<10ms,所以猜测问题出在Datanode端,观察发现,Datanode偶发心跳汇报间隔>30s(正常3s一次),Datanode IBR和心跳都是BPServiceActor线程处理,很可能是心跳阻塞了IBR。

  1. 我们根据猜测的方向,继续定位什么原因导致心跳阻塞了IBR汇报,于是在每台节点上,部署了脚本(见下图),根据Datanode的Jmx指标监听本节点心跳间隔,大于10s时就打印Datanode的Jstack。

Datanode 每个节点上的metric信息里包含心跳汇报间隔的数据。

  1. 分析多个Jstack代码(具体内容见下),可以发现BPServiceActor线程被CommandProcessingThread线程阻塞,而CommandProcessingThread线程在调用invalidate()方法,而invalidate()是在调用删除操作。
"BP-1732625734-****-1675758643065 heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None
        
"Command processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000]
   java.lang.Thread.State: RUNNABLE
        at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
        at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
        at java.io.File.isDirectory(File.java:858)
        at java.io.File.toURI(File.java:741)
        at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$$Lambda$75/2086554487.run(Unknown Source)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)

   Locked ownable synchronizers:
        - <0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync)
        - <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

结合堆栈信息定位到代码,确实发现processCommandFromActor方法在执行删除(调用invalidate()方法)操作时与心跳汇报updateActorStatesFromHeartbeat方法共用同一把写锁。

class BPOfferService {
private final Lock mWriteLock = mReadWriteLock.writeLock();
void writeLock() {
  mWriteLock.lock();
}

void writeUnlock() {
  mWriteLock.unlock();
}

void updateActorStatesFromHeartbeat(
    BPServiceActor actor,
    NNHAStatusHeartbeat nnHaState) {
  writeLock();
  try {
//... 心跳汇报
  } finally {
    writeUnlock();
  }
}
boolean processCommandFromActor(DatanodeCommand cmd,
    BPServiceActor actor) throws IOException {
  assert bpServices.contains(actor);
// ...省略
  writeLock();
  try {
//...执行删除逻辑
  } finally {
    writeUnlock();
  }
}
}

  1. 确认问题:查看Namenode审计日志发现,集群持续有大量文件删除(Flink删除过期Checkpoint meta文件)操作,修改Datanode端代码,在调用processCommandFromActive方法超过一定10s后打印调用时长与CommandAction日志。查看datanode日志发现确实存在删除操作大于30s的情况,由此确认问题就是出现在删除操作耗时过长影响了Datanode的增量块汇报。

由此确定问题:

删除块操作耗时过长,阻塞datanode心跳,导致IBR被阻塞,块写入成功后不能及时上报,客户端重试一定次数后失败抛异常,重试次数由dfs.client.block.write.locateFollowingBlock.retries决定,默认5次,第一次等待0.4s,之后每次等待时长翻倍,5次约为15s左右。

问题解决方案

找到问题就是出现在BPServiceActor 线程做了太多的事,包含FBR、IBR、心跳汇报,而且心跳汇报和删除共同持有一把写锁,那解决方案一个就把这两把锁进行拆分,一个就是将IBR逻辑单独独立出来,不受心跳汇报影响。

而社区3.4.0版本已经将IBR从BPServiceActor 线程独立出来了,所有我们最终将HDFS-16016 patch 合并到自建Hdfs3.3.3版本中,IBR不会被invalidate()阻塞,问题得到根治!

5 总结与规划

总结:Oss的流量已从早期137Gib/s降低到30Gib/s左右(下图一),自建Hdfs集群峰值流量达到120Gb/s(下图二),且平稳运行

整个项目已完成全部大状态任务从Oss迁移到自建Hdfs,当前Hdfs集群规模xx台,成本x w/月,原OSS带宽费用报价1x w/月,相比节省xx w/月。

未来规划:对于全量 checkpoint 来说,TM 将每个 Checkpoint 内部的数据都写到同一个文件,而对于 RocksDBStateBackend 的增量 Checkpoint 来说,则会将每个 sst 文件写到一个分布式系统的文件内,当作业量很大,且作业的并发很大时,则会对底层 HDFS 形成非常大的压力,

1)大量的 RPC 请求会影响 RPC 的响应时间。

2)大量文件对 NameNode 内存造成很大压力。

针对上面的问题我们未来考虑引入小文件合并方案降低 HDFS 的压力,包括 RPC 压力以及 NameNode 内存的压力。

*文/希贤


线下活动推荐: 得物技术沙龙「企业协作效率演进之路」(总第19期)
时间:2023年7月16日 14:00 ~ 2023年7月16日 18:00
地点:(上海杨浦)黄兴路221号互联宝地 C栋5楼(宁国路地铁站1号口出)

活动亮点:在当今竞争日益激烈的商业环境中,企业协作效率成为企业团队成功的关键。越来越多的企业意识到,通过信息化建设和工具化的支持,可以大幅提升协作效率,并在行业中取得突破。本次沙龙将涵盖多个主题,这些主题将为与会者提供丰富的思考和经验,助力企业协作效率的提升。

通过得物技术沙龙这个交流平台,您将有机会与其他企业的代表一起学习、借鉴彼此的经验和做法。共同探讨企业内部协作效率的最佳实践,驱动企业长期生存和发展。加入得物技术沙龙,与行业先驱者们一起开启协作效率的新篇章!让我们共同为协作效率的突破而努力!

点击报名: 得物技术沙龙「企业协作效率演进之路」(总第19期)
本文属得物技术原创,来源于:得物技术官网
未经得物技术许可严禁转载,否则依法追究法律责任!

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/695955.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

统计项目代码行数工具cloc

Ubuntu用户 使用cloc在ubuntu内统计代码行数 安装cloc工具 sudo apt-get install cloc进入需要统计的目录内&#xff0c;然后执行 cloc .然后就会显示文件目录中的文件数(files)、空白行数(blank)、注释行数(comment)和代码行数(code)。 Windows 用户 也是使用cloc工具 …

leecode-下一排列

题目 题目 分析 妈呀&#xff0c;其实我直接调用函数&#xff0c;一行代码就通过了hhh&#xff0c;不过这种取巧的方式不可取&#xff0c;还是得老老实实的写。 首先需要明白什么叫下一排列&#xff1f; 比如输入&#xff1a; 1 5 8 4 7 6 5 3 1 答案就是&#xff1a; 1 5 …

什么是楼宇卫生间智慧厕所系统

楼宇卫生间智慧厕所系统是专为写字楼、办公楼、商场、集团大厦、工厂等应用场景所设计的解决方案。它利用全自动采集和监控智能化、无线数据传输功能&#xff0c;通过云平台管理软件和手机端应用&#xff0c;实现了公厕的智能化管理和使用者的便利。 随着城市化进程的加速&…

[Json]控制返回数据是否包含某个属性

控制返回数据是否包含某个属性 在我们返回给前端的Json格式的数据时&#xff0c;通常我们会定义一个类&#xff0c;里面定义几个成员变量用来定义返回给前端的具体内容&#xff0c;例如&#xff1a; package cn.tedu.csmall.commons.web;import io.swagger.annotations.ApiMo…

HDLbits--Exams/2013 q2bfsm

try1: module top_module (input clk,input resetn, // active-low synchronous resetinput x,input y,output f,output g ); parameter a0,b1,x12,x23,y14,y25,g16,g07;//b为resetn无效后的状态&#xff0c;在b状态使f保持一个周期 //b收到1后转移到x1&#xff0c;x1收到…

基于双机多线程的程序加速设计

摘要 不断提高程序的运行效率&#xff0c;而又不影响程序功能是程序员的不竭追求。本项目旨在利用并行技术进一步提高程序的效率。 程序设计中&#xff0c;主要实现了百万级数据的求和、求最大值以及排序功能。其中&#xff0c;排序功能使用快速排序算法和归并算法实现。共采用…

lenovo联想笔记本ThinkBook 14 Gen5+ IRH(21HW)原装Win11系统镜像原厂OEM恢复出厂状态

LENOVO联想笔记本电脑&#xff0c;ThinkBook 14 Gen5 IRH(21HW)&#xff0c;原厂Windows11原装OEM系统&#xff0c;恢复出厂时状态系统 系统自带所有驱动、出厂主题壁纸LOGO、Office办公软件、联想电脑管家等预装程序 所需要工具&#xff1a;16G或以上的U盘 文件格式&#x…

使用凌鲨查看mysql数据

MySQL是一种开源的关系型数据库管理系统&#xff0c;它被广泛应用于软件开发领域。它具有高可靠性、高性能、易于使用和可扩展性等优点&#xff0c;被许多大型企业和网站所采用。MySQL支持多种编程语言和操作系统&#xff0c;可以轻松地与其他应用程序集成。 继之前我们在凌鲨…

python如何将图片显示在网页上

from flask import Flask, render_template_string import base64 import cv2import osapp Flask(__name__)# 读取图像app.route(/)def index():# 读取图像文件并将其转换为Base64编码的字符串img_path 1.pngimg_data open(img_path, rb).read()img_base64 base64.b64encod…

vue中纯手写单选复选框样式(隐藏原生样式)

基于vue2项目&#xff0c;代码会全部在下面贴出&#xff0c;大家重点关注相关v-for循环实现及样式实现&#xff0c;先看效果&#xff1a; 先看单选 单选组件<easy-radio>&#xff1a; <template><div><div class"radio-item" v-for"(opt…

TLC能力加QLC价格:Solidigm D5-P5430评测

产品介绍 前段时间在Solidigm D5-P5316的帮助下&#xff0c;计算圆周率100万亿位数的世界纪录被刷新&#xff0c;新纪录的计算效率达到之前的三倍。我们一方面能够感受到SSD对高性能计算的影响&#xff0c;另一方面也看到QLC已经在数据中心中得到广泛采用。今天PCEVA评测的是使…

IVIEW常用问题解决

1 FormItem 里面绑定帮助框 导致字段不检验 <FormItem label"备货通知单" prop"noticeIdStr"><Input style"width: 200px;" :title"noticeIdStr"icon"ios-list-box-outline"on-click"showNotice" v-mod…

基于深度学习的细粒度漏洞检测框架VulDeeLocator

源自&#xff1a;IEEE Transactions on Dependable and Secure Computing 作者&#xff1a;Zhen Li, Deqing Zou, Shouhuai Xu, Zhaoxuan Chen, Yawei Zhu, Hai Jin. 背景与动机 设计与实现 图1 VulDeeLocator框架 图2 sSyVC和iSeVC的生成示例 图3 BRNN-vdl模型 实验结果 表…

【④MySQL函数】:让你的数据库操作更高效(一)

前言 ✨欢迎来到小K的MySQL专栏&#xff0c;本节将为大家带来MySQL字符串函数和数学函数的讲解✨ 目录 前言一、字符串函数二、数学函数三、总结 一、字符串函数 函数作用UPPER(列|字符串)将字符串每个字符转为大写LOWER(列|字符串)将字符串每个字符转为小写CONCAT(str1,str2,…

QSS QCalendarWidget

样式分布图&#xff1a; 知道了每个 widget 后&#xff0c;就可以像下面这样用 QSS 修改 QCalendarWidget 的样式了。 示例1&#xff1a; #qt_calendar_calendarview {background: white; }#qt_calendar_navigationbar {background: rgba(215, 215, 215, 255); }QToolButton …

功率放大器在脉冲技术中的应用

脉冲技术是指在时间上极短且能量很强的信号的处理和应用技术&#xff0c;它在通信、雷达、医学、能量研究等领域有着重要的应用。在这些应用过程中&#xff0c;功率放大器是非常重要的关键设备&#xff0c;因为它可以提供高功率脉冲信号&#xff0c;使得这些领域的应用能够顺利…

react antd动态样式实现

<Row><Col style{{ marginBottom: 30px }} ><a className"labelstyle" style{{ padding: fundType.length < 1 ? 0px : 5px, marginRight: fundType.length < 1 ? 0px : 10px }} >{fundType}</a><a className"labelstyle&q…

Gitlab 双重认证和访问令牌的使用

目录 引言 1、双重认证让项目只能使用访问令牌克隆 2、创建项目访问令牌 3、创建群组访问令牌 引言 双重认证可以提高用户账户的安全性&#xff0c;防止密码泄露&#xff0c;他人随意登录。 访问令牌就相当于项目或群组的访问密码&#xff0c;有了它就可以克隆项目。同时访…

作为一名python开发者,想要兼职接单,需要学那些技术?要达到什么水准?为什么要学这些技术?

作为一名Python开发者&#xff0c;学习并且兼职接单可以创造更多的机会和收入。要成为一名具有竞争力的兼职Python开发者&#xff0c;需要学习一系列的技术&#xff0c;并达到一定的水准。本文将详细分析兼职Python开发者需要学习的技术、所需达到的水平&#xff0c;以及为什么…

JavaScript引爆Salesforce职业生涯!抓住高薪机会

Salesforce是一款领先的CRM软件&#xff0c;已被各种规模和行业的企业使用多年。Salesforce不仅易于使用&#xff0c;而且可定制&#xff0c;使企业能够改善其销售、营销、客户服务和其他业务流程。 近年来&#xff0c;Salesforce一直在创新&#xff0c;从传统的基于Oracle的平…