flink优化案例

news2025/1/4 15:34:23

文章目录

  • 一、flink join维表案例
  • 二、flink 双流join案例
  • 三、总结


提示:以下是本篇文章正文内容,下面案例可供参考(适用于flink1.13+)

一、flink join维表案例

  • 背景:flink sql join 维表。job业务不复杂,job写入性能比较差。维表数据大约每天100w条数据(有其他job实时生成维表数据),维表数据只保存近5天数据。
  • job 资源使用情况:TM 1cpu,4Gb内存,1个并行度
  • 性能问题:job每秒写数据慢(已检查:checkpoint生成很快,生成的文件也小)
  • 开始优化
    优化思路:对维表参数的优化参数配置
对定义维表参数的优化参数配置(下面定义维表参数flink官网有参数或类似的参数。提供思路)
  'cache' = 'LRU'  --缓存策略
  'async' = 'true',  
  'cacheEmpty' = 'false',
  'cacheSize'='5000000',  --缓存条数500万条(思路:希望将所有维表数据全部缓存到内存中)
  'cacheTTLMs' = '10800000' --缓存维表时间(缓存3小时,不希望缓存过段或过长导致查源数据库表)

运行后性能比之前没有添加参数要快(相同资源下由4k/s->提升到6k/s)

当以为调优成功时,发现运行一段时间job开始下降。由处理能力6k/s下降到几百条/s,数据有挤压,延时数据开始增大。

之前cpu不变的情况下4Gtaskmanager 内存 30分钟后性能开始下降;
在这里插入图片描述
现在cpu不变的情况下8Gtaskmanager 内存 60分钟后性能开始下降(处理性能下降导致数据开始有堆积);
在这里插入图片描述
后面开始直接对job tm 增加CPU,增加内存都是运行一小段时间,性能还是开始下降.
观察生成的DAG图发现有节点一直处于busy
在这里插入图片描述
继续各种尝试。开始对busy的节点增加并行度(阿里云flink有专家模式支持,flink开源版不支持此功能)
table.exec.split-slot-sharing-group-per-vertex=false
(作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源 ###有用的参数)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
总结:a.先优化维表参数,当优化完维表参数后增加资源运行一段时间性能还是下降,开始对节点单独做调整(某个节点性能较弱,单独增加并行度和资源)。如果对整体job添加资源也是可以解决问题,但比较浪费资源。建议针对性能节点单独处理比较好。

二、flink 双流join案例

背景:flink 双流join,处理完后写表,业务逻辑不复杂
问题现象:job 处理性能差,消费数据有大量堆积延时。
表现问题现象:checkpoint 生成很久后失败(或全部失败或频繁失败)
在这里插入图片描述
在这里插入图片描述
可以看下面flink生成的DAG图末尾的sink写表节点已经完全处于卡住状态(0条写入)
在这里插入图片描述

后面对这个job增加资源增加并行度(当分配很少资源时,job运行半天后CP开始一直失败)时,整个job刚开始只能成功创建一个CP后面创建CP全部失败。

  • 调查发现:作业DAG有个SinkMaterializer算子节点(一般双流join会有这个节点,其他操作没有这个节点。且这个节点一直处于busy 如上图),而且检查checkpoint历史时发现该算子state越来越大。
  • SinkMaterializer的算子节点作用:这个算子将输入的记录以upsert key作区分保存到state中,
    并为下游算子提供一个upsert视图。目的:为了解决changelog流事件乱序造成了结果不正确的问题.
  • 问题解决:根据上面查的资料,在根据自己的业务情况(晚来的右表数据大部分是一样的,可以理解一对多,同时即使右表同一个Key下有少量个别字段有少表不一样对业务也不会造成影响)。是可以接受极少异常情况晚来的相同key的值数据。
  • 做法:对job添加参数:table.exec.sink.upsert-materialize=NONE (此参数开源flink,阿里云flink都通用) 运行后作业DAG就没有SinkMaterializer算子节点,且job处理性能极强(tm:1cpu,4Gb 每秒sink接近30k/s)
  • 参数描述 : 由于分布式系统中的 shuffle 会造成 Changelog 数据的乱序,所以 sink 接收到的数据可能在全局的 upsert 中乱序,所以要在 upsert sink 之前添加一个 upsert 物化算子。该算子接收上游 changelog
    数据,并且给下游生成一个 upsert 视图。这个参数用于控制物化算子的添加
  • 注意事项 : A. 默认情况下,在唯一 key 遇到分布式乱序时,该物化算子会被添加,也可以选择不物化(NONE),或者是强制物化(FORCE) B.
    可选值有:NONE、AUTO、FORCE

10分钟内将延时6个小时数据给全部追上(之前没有加那个参数,tm比这个资源配置高,每秒几十条数据,运行半天后job卡住最后0写入)
在这里插入图片描述
在这里插入图片描述

  • 添加完那个参数后CP生成很快很稳定,CP也大幅度变小。再无失败CP.
    在这里插入图片描述

  • 参考:参考1,参考2

  • 小总结:不加那个参数就不会有sink matertilizer那个节点。之前那个节点State比较大,不加的话不会有排序操作,加上的话会把数据缓存下来,修正乱序的问题,所以State会大。

三、总结

  • 上面是两个真实优化案例。优化的方向不同,应该是普通job的优化(维表属性定义和节点调优),一个是有job的写运行机制优化(结合自身业务提升job性能)

CheckPoint说明:
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

  • CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier;
  • 当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理;
  • 下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理;
  • 每个算子按照上面步骤不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
  • 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 ;
  • 一旦发生了错误,Flink的JobManager会告诉 task需要从最新的checkpoint中恢复,它可以是全量的或者是增量的。之后TaskManager从分布式系统中下载checkpoint文件, 然后从中恢复状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1672815.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ubuntu20.04运行python文件时,报错No module named ‘rospkg’】

**问题原因:**一般来说,并不是真的缺少rospkg,而是系统中存在多个python版本导致的混乱 检查python版本 Ubuntu20.04 —— pyhon3.8 sudo apt-get install python3.8最新版本,如下图所示 查看python3.8的位置 whereis python…

C++ | Leetcode C++题解之第88题合并两个有序数组

题目&#xff1a; 题解&#xff1a; class Solution { public:void merge(vector<int>& nums1, int m, vector<int>& nums2, int n) {int p1 m - 1, p2 n - 1;int tail m n - 1;int cur;while (p1 > 0 || p2 > 0) {if (p1 -1) {cur nums2[p2-…

让创意在幻觉中肆虐: 认识Illusion Diffusion AI

人工智能新境界 在不断发展的人工智能领域,一款非凡的新工具应运而生,它能将普通照片转化为绚丽的艺术品。敬请关注Illusion Diffusion,这是一个将现实与想象力完美融合的AI驱动平台,可创造出迷人的视错觉和超现实意境。 AI算法的魔力所在 Illusion Diffusion 的核心是借助先进…

每日OJ题_贪心算法四⑧_力扣767. 重构字符串

目录 力扣767. 重构字符串 解析代码 力扣767. 重构字符串 767. 重构字符串 难度 中等 给定一个字符串 s &#xff0c;检查是否能重新排布其中的字母&#xff0c;使得两相邻的字符不同。 返回 s 的任意可能的重新排列。若不可行&#xff0c;返回空字符串 "" 。 …

测试页打印失败。是否要参阅打印疑难解答以获得帮助?服务器打印后台处理程序服务没有运行。请重新启动服务器上的打印后台处理程序或重新启动服务器计算机。

问题&#xff1f; 测试页打印失败。是否要参阅打印疑难解答以获得帮助? 解决办法&#xff1a; 方法1、 请重新启动服务器上的打印后台处理程序或重新启动服务器计算机。 方法2、 找到services服务找到print spooler停止运行&#xff0c; C:\Windows\System32\spool -------…

回炉重造java----JVM

为什么要使用JVM ①一次编写&#xff0c;到处运行&#xff0c;jvm屏蔽字节码与底层的操作差异 ②自动内存管理&#xff0c;垃圾回收功能 ③数组下边越界检查 ④多态 JDK&#xff0c;JRE&#xff0c;JVM的关系 JVM组成部分 JVM的内存结构 《一》程序计数器(PC Register) 作用…

记录MySQL数据库查询不等于xxx时的坑

目录 一、背景 二、需求 三、方法 四、示例 一、背景 在使用MySQL数据库查询数据时&#xff0c;需要查询字段name不等于xxx的记录&#xff0c;通过where name ! xxx查询出来的记录不符合预期&#xff0c;通过检查发现少了name字段为null的记录&#xff0c;后经查询得知在My…

Python | Leetcode Python题解之第88题合并两个有序数组

题目&#xff1a; 题解&#xff1a; class Solution:def merge(self, nums1: List[int], m: int, nums2: List[int], n: int) -> None:"""Do not return anything, modify nums1 in-place instead."""p1, p2 m - 1, n - 1tail m n - 1whi…

【吃透Java手写】5-RPC-简易版

【吃透Java手写】RPC-简易版-源码解析 1 RPC1.1 RPC概念1.2 常用RPC技术或框架1.3 初始工程1.3.1 Productor-common&#xff1a;HelloService1.3.2 Productor&#xff1a;HelloServiceImpl1.3.3 Consumer 2 模拟RPC2.1 Productor2.2 模拟一个RPC框架2.2.1 HttpServer2.2.2 Http…

Elasticsearch解决字段膨胀问题

文章目录 背景Flattened类型的产生Flattened类型的定义基于Flattened类型插入数据更新Flattened字段并添加数据Flattened类型检索 Flattened类型的不足 背景 Elasticsearch映射如果不进行特殊设置&#xff0c;则默认为dynamic:true。dynamic:true实际上支持不加约束地动态添加…

【AI大模型】自动生成红队攻击提示--GPTFUZZER

本篇参考论文为&#xff1a; Yu J, Lin X, Xing X. Gptfuzzer: Red teaming large language models with auto-generated jailbreak prompts[J]. arXiv preprint arXiv:2309.10253, 2023. https://arxiv.org/pdf/2309.10253 一 背景 虽然LLM在今天的各个领域得到了广泛的运用…

AI办公自动化-用kimi自动清理删除重复文件

在kimichat中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个编写Python脚本的任务&#xff0c;具体步骤如下&#xff1a; 1、打开文件夹D:\downloads&#xff1b; 2、哈希值比较比较里面所有的文件&#xff0c;如果文件相同&#xff0c;那么移动多余…

3D Gaussian Splatting for Real-Time Radiance Field Rendering 论文阅读

如此热门的项目&#xff0c;网络上有很多大牛分析了这篇文章的做法&#xff0c;在这里简单记录一下个人粗浅的理解。 关于各种数学表达式的推导&#xff0c;论文和参考资料中都提供了较为详细的解读&#xff0c;本人能力有限&#xff0c;这一部分理解不够深刻&#xff0c;先不做…

绝地求生:艾伦格回归活动来了,持续近1个月,新版本皮肤、G币等奖励白嫖

嗨&#xff0c;我是闲游盒~ 29.2版本更新在即&#xff0c;新活动来啦&#xff01;目前这个活动国内官方还没发&#xff0c;我就去台湾官方搬来了中文版方便大家观看&#xff0c;也分析一下这些奖励应该怎样才能获得。 新版本将在周二进行约9小时的停机维护&#xff0c;请注意安…

centos7中如何优雅的动态切换jdk版本?

在 CentOS 7 中动态切换 JDK 版本可以通过多种方法实现&#xff0c;其中最常见的方法是使用 alternatives 命令&#xff0c;这是 CentOS 和其他基于 Red Hat 的系统中用于管理多个软件版本的标准工具。下面我会详细介绍如何使用 alternatives 命令来切换 JDK 版本。 步骤 1: 安…

如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步

AWS Managed Apache Flink &#xff08;以下以 MAF 代指&#xff09;是 AWS 提供的一款 Serverless 的 Flink 服务。 1. 问题 大家在使用 MAF 的时候&#xff0c;可能遇到最大的一个问题就是 MAF 的依赖管理&#xff0c;很多时候在 Flink 上运行的代码&#xff0c;托管到 MAF…

[Algorithm][回溯][找出所有子集的异或总和再求和][全排列 II][电话号码的字母组合][括号生成]详细讲解

目录 1.找出所有子集的异或总和再求和1.题目链接2.算法原理详解3.代码实现 2.全排列 II1.题目链接2.算法原理详解3.代码实现 3.电话号码的字母组合1.题目链接2.算法原理详解3.代码实现 4.括号生成1.题目链接2.算法原理详解3.代码实现 1.找出所有子集的异或总和再求和 1.题目链…

PCIE协议-2-事务层规范-TLP Prefix Rules

2.2.10 TLP前缀规则 以下规则适用于任何包含TLP前缀的TLP&#xff1a; 对于任何TLP&#xff0c;TLP中byte0的Fmt[2:0]字段中的值100b表示存在TLP前缀&#xff0c;并且Type[4]位指示TLP前缀的类型。 Type[4]位中的值0b表示存在本地TLP前缀。Type[4]位中的值1b表示存在端到端TL…

数据结构与算法-排序算法1-冒泡排序

本文先介绍排序算法&#xff0c;然后具体写冒泡排序。 目录 1.排序算法简介 2.常见的排序算法分类如下图&#xff1a; 3.冒泡排序&#xff1a; 1.介绍&#xff1a; 2.动态图解 3.举例 4.小结冒泡排序规则 5.冒泡排序代码 6.优化 7.优化后时间 代码&#xff1a; 运…

Java | Leetcode Java题解之第88题合并两个有序数组

题目&#xff1a; 题解&#xff1a; class Solution {public void merge(int[] nums1, int m, int[] nums2, int n) {int p1 m - 1, p2 n - 1;int tail m n - 1;int cur;while (p1 > 0 || p2 > 0) {if (p1 -1) {cur nums2[p2--];} else if (p2 -1) {cur nums1[p…