Flink 优化 (二) --------- 状态及 Checkpoint 调优

news2024/11/23 7:14:03

目录

  • 一、RocksDB 大状态调优
    • 1. 开启 State 访问性能监控
    • 2. 开启增量检查点和本地恢复
    • 3. 调整预定义选项
    • 4. 增大 block 缓存
    • 5. 增大 write buffer 和 level 阈值大小
    • 6. 增大 write buffer 数量
    • 7. 增大后台线程数和 write buffer 合并数
    • 8. 开启分区索引功能
    • 9. 参数设定案例
  • 二、Checkpoint 设置


一、RocksDB 大状态调优

RocksDB 是基于 LSM Tree 实现的(类似 HBase),写数据都是先缓存到内存中,所以 RocksDB 的写请求效率比较高。RocksDB 使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中 blockcache 中查找,如果内存中没有再去磁盘中查询。使用 RocksDB 时,状态大小仅受可用磁盘空间量的限制,性能瓶颈主要在于 RocksDB 对磁盘的读请求,每次读写操作都必须对数据进行反序列化或者序列化。当处理性能不够时,仅需要横向扩展并行度即可提高整个 Job 的吞吐量。

在这里插入图片描述
从 Flink1.10 开始,Flink 默认将 RocksDB 的内存大小配置为每个 task slot 的托管内存。调试内存性能的问题主要是通过调整配置项 taskmanager.memory.managed.size 或者 taskmanager.memory.managed.fraction 以增加 Flink 的托管内存(即堆外的托管内存)。进一步可以调整一些参数进行高级性能调优,这些参数也可以在应用程序中通过RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)指定。下面介绍提高资源利用率的几个重要配置:

1. 开启 State 访问性能监控

Flink 1.13 中引入了 State 访问的性能监控,即 latency trackig state。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用此功能。

在这里插入图片描述
State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样(sample),对不同的 State Backend 性能损失影响不同:

对于 RocksDB State Backend,性能损失大概在 1% 左右
对于 Heap State Backend,性能损失最多可达 10%

state.backend.latency-track.keyed-state-enabled:true #启用访问状态的性能监控
state.backend.latency-track.sample-interval: 100 #采样间隔
state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确
state.backend.latency-track.state-name-as-variable: true #将状态名作为变量

正常开启第一个参数即可。

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.latency-track.keyed-state-enabled=true \
-c com.fancy.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

2. 开启增量检查点和本地恢复

A、开启增量检查点

RocksDB 是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:

state.backend.incremental: true #默认 false,改为 true。

或代码中指定

new EmbeddedRocksDBStateBackend(true)

B、开启本地恢复

当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据。本地恢复目前仅涵盖键控类型的状态后端(RocksDB),MemoryStateBackend 不支持本地恢复并忽略此选项。

state.backend.local-recovery: true

C、设置多目录

如果有多块磁盘,也可以考虑指定本地多目录

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

注意:不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力。

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.incremental=true \
-Dstate.backend.local-recovery=true \
-Dstate.backend.latency-track.keyed-state-enabled=true \
-c com.fancy.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

3. 调整预定义选项

Flink 针对不同的设置为 RocksDB 提供了一些预定义的选项集合,其中包含了后续提到的一些参数,如果调整预定义选项后还达不到预期,再去调整后面的 block、writebuffer 等参数。

当前支持的预定义选项有 DEFAULT 、 SPINNING_DISK_OPTIMIZED 、SPINNING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。有条件上 SSD
的,可以指定为 FLASH_SSD_OPTIMIZED

state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM 
#设置为机械硬盘+内存模式

4. 增大 block 缓存

整个 RocksDB 共享一个 block cache,读数据时内存的 cache 大小,该参数越大读数据时缓存命中率越高,默认大小为 8 MB,建议设置到 64 ~ 256 MB。

state.backend.rocksdb.block.cache-size: 64m #默认 8m

5. 增大 write buffer 和 level 阈值大小

RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用独占的 write buffer,默认 64MB,建议调大。

调整这个参数通常要适当增加 L1 层的大小阈值 max-size-level-base,默认 256m。

该值太小会造成能存放的 SST 文件过少,层级变多造成查找困难,太大会造成文件过多,合并困难。建议设为 target_file_size_base(默认 64MB)的倍数,且不能太小,例如 5 ~ 10倍,即 320~640MB。

state.backend.rocksdb.writebuffer.size: 128m
state.backend.rocksdb.compaction.level.max-size-level-base: 320m

6. 增大 write buffer 数量

每个 Column Family 对应的 writebuffer 最大数量,这实际上是内存中“只读内存表“的最大数量,默认值是 2。对于机械磁盘来说,如果内存足够大,可以调大到 5 左右

state.backend.rocksdb.writebuffer.count: 5

7. 增大后台线程数和 write buffer 合并数

(1) 增大线程数

用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值

state.backend.rocksdb.thread.num: 4

(2) 增大 writebuffer 最小合并数

将数据从 writebuffer 中 flush 到磁盘时,需要合并的 writebuffer 最小数量,默认值为 1,可以调成 3。

state.backend.rocksdb.writebuffer.number-to-merge: 3

8. 开启分区索引功能

Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存比较小的场景中,性能提升 10 倍左右。如果在内存管控下 Rocksdb 性能不如预期的话,这也能成为一个性能优化点。

state.backend.rocksdb.memory.partitioned-index-filters:true #默认 false

9. 参数设定案例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.incremental=true \
-Dstate.backend.local-recovery=true \
-Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \
-Dstate.backend.rocksdb.block.cache-size=64m \
-Dstate.backend.rocksdb.writebuffer.size=128m \
-Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \
-Dstate.backend.rocksdb.writebuffer.count=5 \
-Dstate.backend.rocksdb.thread.num=4 \
-Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \
-Dstate.backend.rocksdb.memory.partitioned-index-filters=true \
-Dstate.backend.latency-track.keyed-state-enabled=true \
-c com.atguigu.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

二、Checkpoint 设置

一般需求,我们的 Checkpoint 时间间隔可以设置为分钟级别(1 ~ 5 分钟)。对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之间至少暂停 4 或 8 分钟。同时,也需要考虑时效性的要求,需要在时效性和性能之间做一个平衡,如果时效性要求高,结合 end- to-end 时长,设置秒级或毫秒级。如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。

RocksDB 相关参数在前面已说明,可以在 flink-conf.yaml 指定,也可以在 Job 的代码中调用 API 单独指定,这里不再列出。

// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new  RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 开启 Checkpoint,间隔为 3 分钟
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔 4 分钟
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
// 超时时间 10 分钟
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/424959.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

拐点!智能座舱破局2023

“这是我们看到的整个座舱域控渗透率,2022年是8.28%,主力的搭载车型仍然是30-35万区间。”3月29日,2023年度(第五届)高工智能汽车市场峰会上,高工智能汽车研究院首发《2022-2025年中国智能汽车产业链市场数…

WRF中替换LAI数据

WRF中替换LAI数据 本次下载的LAI数据是GLASS中的AVHRR(1981-2018)(V50),可以从这个这里获取数据GLASS_AVHRR_LAI数据集。由于我需要做一个时间序列的模拟,总共下载了从1990-2018年灌溉季3-9月份对应的天数为73,105,137&#xff0…

人工智能大时代——AIGC综述

生成式AI分类 模型按照输入输出的数据类型分类,目前主要包括9类。 有趣的是,在这些已发布大模型的背后,只有六个组织(OpenAI, Google, DeepMind, Meta, runway, Nvidia)参与部署了这些最先进的模型。 其主要原因是&am…

11.基于粒子群算法的含风光燃储微网优化调度(论文复现)

说明书 相关代码资源:基本算法智能微电网粒子群优化算法,微源:光伏、风机、发电机、储能等 基于多目标算法的冷热电联供型综合能源系统运行优化 基于多目标粒子群算法冷热电联供综合能源系统运行优化 MATLAB代码:基于粒子群算法的含风光燃…

0成本 使用home assistant远程开关机电脑

环境:dockerwin10HACS 问题:在外网手机上远程开关机家中电脑 解决办法:开机:WOL,关机ssh命令 背景:在部署HACS后,便想用HACS中的命令来开关机windows电脑,开机很简单,使用…

暴力破解之验证码识别

文章目录背景操作步骤1、安装python模块2、安装Captcha-killer模块3、尝试进行验证码识别背景 渗透测试过程中,现在验证码越来越多,这对测试的时候遇到的阻力不小,一位大佬给我安利了一个burp插件,Captcha-killer,可以…

ROS开发之如何使用ICM20948 IMU模块?

文章目录0.引言1.创建工作空间2.获取IMU功能包并编译3.检查IMU端口4.启动launch显示IMU测量结果0.引言 笔者研究课题涉及多传感器融合,除了前期对ROS工具的学习,还需要用IMU获取数据,对其他传感器的姿态纠正。本文使用IMU模块获取姿态数据。I…

华为乾坤王辉:新一代网络安全融合体系,筑牢企业数字化转型基石丨2023 INSEC WORLD

科技云报道原创。 随着数字化时代的到来,网络安全形势持续动荡。 围绕产业未来发展趋势、信息安全产业可持续发展、信息安全技术发展路径等话题,一场信息安全行业年度盛会——INSEC WORLD世界信息安全大会在西安盛大召开。 本届大会汇聚了近50位海内…

大数据技术(入门篇) --- 使用 Spring Boot 操作 CDH6.2.0 Hadoop

前言 本人是web后端研发,习惯使用spring boot 相关框架,因此技术选型直接使用的是spring boot,目前并未使用 spring-data-hadoop 依赖,因为这个依赖已经在 2019 年终止了,可以点击查看 ,所以我这里使用的是…

防火墙的IPSECVPN点到点实验 dsvpn多层分支实验

目录 防火墙的IPSECVPN点到点实验 dsvpn多层分支实验 ​编辑 防火墙的IPSECVPN点到点实验 配置路由器接口IP 配置接口防火墙IP 写放通的策略 ping对端防火墙的接口看是否能ping通 ipsec进行配置 配置往返流量 dsvpn多层分支实验 先配置IP 2,配置静态IP 3&#xf…

拦截导弹 导弹防御系统

拦截导弹 & 导弹防御系统拦截导弹导弹防御系统拦截导弹 题目链接:acwing1010. 拦截导弹 题目描述: 输入输出: 分析: 第一个问题为输出最长递减子序列,由于导弹数在1000以内所以采用时间复杂度为O(n2)O(n^2)O(n2)或者O(nlogn)O(nlogn)O…

介绍一款idea神级插件【Bito-ChatGPT】

什么是Bito? Bito是一款在IntelliJ IDEA编辑器中的插件,Bito插件是由ChatGPT团队开发的,它是ChatGPT团队为了提高开发效率而开发的一款工具。ChatGPT团队是一支专注于自然语言处理技术的团队,他们开发了一款基于GPT的自然语言处理…

[oeasy]python0133_[趣味拓展]好玩的unicode字符_另类字符_上下颠倒英文字符

另类字符 回忆上次内容 上次再次输出了大红心♥ 找到了红心对应的编码黑红梅方都对应有编码 原来的编码叫做 ascii️ \u这种新的编码方式叫unicode包括了 中日韩字符集等 各书写系统的字符集 除了这些常规字符之外 还有什么好玩的东西呢? 颠倒字符 这个网站可以…

DQN基本概念和算法流程(附Pytorch代码)

❀DQN算法原理 DQN,Deep Q Network本质上还是Q learning算法,它的算法精髓还是让Q估计Q_{估计}Q估计​尽可能接近Q现实Q_{现实}Q现实​,或者说是让当前状态下预测的Q值跟基于过去经验的Q值尽可能接近。在后面的介绍中Q现实Q_{现实}Q现实​也…

提高工作效率必备,5款实用的Windows系统工具推荐

每次分享实用的软件,都会给人一种踏实和喜悦的感觉,这也是我热衷于搜集和推荐高效工具软件的原因。 音量控制——EarTrumpet EarTrumpet是一款音量控制工具,可以让你更方便地调节Windows系统中不同应用程序的音量。你可以使用EarTrumpet来替代系统自带的音量混合器…

表单设计器开源的定义和应用场景布局介绍

为了实现提质增效的办公自动化,表单设计器开源工具的应用变得广泛起来。在低代码开发市场昌盛发展的今天,不少企业期望通过快速、现成的快速配置表单工具实现高效率表单制作,那么,现在给大家介绍的这款开发易用性强、组件丰富、高…

设计模式 -- 门面模式

前言 月是一轮明镜,晶莹剔透,代表着一张白纸(啥也不懂) 央是一片海洋,海乃百川,代表着一块海绵(吸纳万物) 泽是一柄利剑,千锤百炼,代表着千百锤炼(输入输出) 月央泽,学习的一种过程,从白纸->吸收各种知识->不断输入输出变成自己的内容 希望大家一起坚持这个过程,也同…

stable-diffusion真的好用吗?

hi,各位大佬,今天尝试下diffusion大模型,也是CV领域的GPT,但需要prompt,我给了prompt结果并不咋滴,如下示例,并附代码及参考link 1、img2img 代码实现: import torch from PIL im…

PageHelper的使用

这个分页插件是在Mybatis的环境中使用的&#xff0c;所以项目需要导入Mybatis依赖 更加详细的用法看官方文档&#xff1a;PageHelper官网 在Mybatis中使用 前提条件 引入依赖 <dependency><groupId>com.github.pagehelper</groupId><artifactId>pa…

GANs和Generative Adversarial Nets和Vox2Vox: 3D-GAN for Brain Tumour Segmentation

参考&#xff1a; 各种生成模型&#xff1a;VAE、GAN、flow、DDPM、autoregressive models https://blog.csdn.net/zephyr_wang/article/details/126588478李沐GAN精度 x.1 生成模型家族 DGMs&#xff08;Deep Generatitve Models&#xff09;家族主要有&#xff1a;GAN&…