Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

news2025/1/12 19:08:20

最近需求,仅想提高sink2es的qps,所以仅调节了sink2es的并行度,但在调节不同算子并行度时遇到一些问题,找出问题的根本原因解决问题,并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms

CREATE TABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
     publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
     msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
     send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
     --event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     --WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
   --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
  'format' = 'json'
);



CREATE TABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER
    ,init           INTEGER
    ,init_cancel    INTEGER
    ,push          INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,init_delete    INTEGER
    ,update_time    STRING
    ,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE view  tmp as
select
    send_type,
    account_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
    case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
    case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');


--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6

CREATE view  tmp_groupby as
select
 COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
         when send_type is not null and account_id is null and publish_time is null then 2
         when send_type is not null and account_id is not null and publish_time is null then 3
         when send_type is not null and account_id is not null and publish_time is not null then 4
         end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上

INSERT INTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

发现问题

由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同,上游算子同一个key的数据可能会下发到下游多个不同算子中。
导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id),报错如下:

...
	Caused by: [test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]
		at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
		at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
		at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
		at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
		at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
		at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
		at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
		at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		... 1 more
	[CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]

问题原因

Flink中存在八种分区策略,常用Operator Chain链接方式有三种分区器:

  • forward:上下游并行度相同,且不发生shuffle,直连的分区器
  • hash:将数据按照key的Hash值下发到下游的算子中
  • rebalance:数据会被循环或者随机下发到下游算子中,改变并行度若无keyby,默认使用RebalancePartitioner分区策略

rebalance分区器,可能会将上游算子的同一个key随机下发到下游不同算子中,因而引起报错,如下图:
在这里插入图片描述在这里插入图片描述模型如下:

在这里插入图片描述

解决方案

  • 分组聚合算子与sink2es算子配置成相同的并行度,即使用forward分区器,如下图:
    在这里插入图片描述在这里插入图片描述
    另外sink2es forward分区器上游operator chain已经通过hash分区器保证了同一个key只能下发到下游一个subtask实例中

模型如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/215b7ba208b142da803a78d85b0f474e.png

  • sink2es算子的并行度配置为1,如下图:

operator chain为forward分区器模型如下:
在这里插入图片描述

总结

归根结底就是需要保证:上游subtask中同一个key只能下发到下游一个subtask中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1417458.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

泛微E-Cology getE9DevelopAllNameValue2 任意文件读取漏洞复现

0x01 产品简介 泛微协同管理应用平台e-cology是一套兼具企业信息门户、知识文档管理、工作流程管理、人力资源管理、客户关系管理、项目管理、财务管理、资产管理、供应链管理、数据中心功能的企业大型协同管理平台。泛微E-Cology 依托全新的设计理念,全新的管理思想。为中大…

WordPress块编辑器(Gutenberg古腾堡)中如何添加脚注?

WordPress默认自带的块编辑器​&#xff08;Gutenberg古腾堡编辑器&#xff09;本身就自带添加脚注功能&#xff0c;不过经典编辑器不行。如果想要在WordPress中添加更加专业的脚注&#xff0c;建议使用Modern Footnotes插件&#xff0c;具体介绍及使用请参考『WordPress站点如…

你好,C++对象

你好&#xff0c;对象 面向对象开发对象的定义 类与对象类的定义类的访问限定符及封装类的实例化类对象模型结构体内存对齐规则 this指针this指针的引入 this指针的特性 类的默认成员函数构造函数析构函数拷贝构造函数结语 面向对象开发 对象的定义 对象的含义是指具体的某一…

NIO-Channel详解

NIO-Channel详解 1.Channel概述 Channel即通道&#xff0c;表示打开IO设备的连接&#xff0c;⽐如打开到⽂件、Socket套接字的连接。在使⽤NIO时&#xff0c;必须要获取⽤于连接IO设备的通道以及⽤于容纳数据的缓冲区。通过操作缓冲区&#xff0c;实现对数据的处理。也就是说…

【大厂AI课学习笔记】1.1.4 学科和学习路径

一、8大学科 特点是特点 &#xff1a;厚基础、重交叉、宽口径。 八大学科分别是&#xff1a;数学与统计、科学与工程、计算机科学与技术、人工智能核心、认知与神经科学、先进机器人技术、人工智能工具与平台。 每个学科&#xff0c;又向下延伸。 MORE: AI&#xff0c;即人…

深度强化学习(王树森)笔记05

深度强化学习&#xff08;DRL&#xff09; 本文是学习笔记&#xff0c;如有侵权&#xff0c;请联系删除。本文在ChatGPT辅助下完成。 参考链接 Deep Reinforcement Learning官方链接&#xff1a;https://github.com/wangshusen/DRL 源代码链接&#xff1a;https://github.c…

【算法与数据结构】139、LeetCode单词拆分

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;本题可以看做一个动态规划问题。其中&#xff0c;字符串s是背包&#xff0c;而字典中的单词就是物品。…

vue 支付宝支付笔记总结

Vue 支付宝支付 1、支付宝介绍 支付宝&#xff08;中国&#xff09;网络技术有限公司成立于2004年&#xff0c;是国内的第三方支付平台&#xff0c;致力于为企业和个人提供“简单、安全、快速、便捷”的支付解决方案。支付宝公司从2004年建立开始&#xff0c;始终以“信任”作…

CTF-PWN-堆-【chunk extend/overlapping-2】(hack.lu ctf 2015 bookstore)

文章目录 hack.lu ctf 2015 bookstore检查IDA源码main函数edit_notedelete_notesubmit .fini_array段劫持(回到main函数的方法)思路python格式化字符串简化思路&#xff1a; exp 佛系getshell 常规getshell hack.lu ctf 2015 bookstore 检查 got表可写&#xff0c;没有地址随…

1、缓存击穿背后的问题

当面试官问&#xff1a;你知道什么是缓存击穿吗&#xff0c;你们是如何解决的&#xff1f; 首先我们要了解什么是缓存击穿&#xff1f;以及缓存击穿会引发什么问题&#xff1f; 缓存击穿就是redis中的热点数据过期&#xff0c;缓存失效&#xff0c;导致大量的请求直接打到数据…

【c++】高精度算法(洛谷刷题2024)玩具谜题详解(含图解)

系列文章目录 第三题&#xff1a;玩具谜题 视频讲解&#xff1a;http://【洛谷题单 - 算法 - 高精度】https://www.bilibili.com/video/BV1Ym4y1s7BD?vd_source66a11ab493493f42b08b31246a932bbb 文章目录 目录 系列文章目录 文章目录 前言 一、题目分析以及思考 二、代码…

伊恩·斯图尔特《改变世界的17个方程》相对论笔记

它告诉我们什么&#xff1f; 物质包含的能量等于其质量乘以光速的平方。 为什么重要&#xff1f; 光的速度很快&#xff0c;它的平方绝对是一个巨大的数。1千克的物质释放出的能量相当于史上最大的核武器爆炸所释放能量的约40%。一系列相关的方程改变了我们对空间、时间、物质和…

C/C++ - 函数进阶(C++)

目录 默认参数 函数重载 内联函数 函数模板 递归函数 回调函数 默认参数 定义 默认参数是在函数声明或定义中指定的具有默认值的函数参数。默认参数允许在调用函数时可以省略对应的参数&#xff0c;使用默认值进行替代。 使用 默认参数可以用于全局函数和成员函数。默认参…

WebRTC 入门:开启实时通信的新篇章(上)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

使用电脑时突然遇到“mfc140.dll文件丢失”的问题都有什么解决办法

当你在使用电脑时突然遇到“mfc140.dll文件丢失”的问题时&#xff0c;可能会感到困惑和苦恼。一旦出现这样的问题&#xff0c;缺少这个文件可能导致一些应用程序无法正常启动&#xff0c;影响你的工作和娱乐体验。其实这个问题是可以解决的&#xff0c;接下来我们将介绍一些可…

部署个人知识库管理软件 MrDoc详细教程

效果 一、拉取 MrDoc 代码 进入目录&#xff1a; cd /opt开源版&#xff1a; git clone https://gitee.com/zmister/MrDoc.git专业版&#xff1a; git clone https://{用户名}:{密码}git.mrdoc.pro/MrDoc/MrDocPro.git二、拉取 Docker 镜像 docker pull zmister/mrdoc:v7三…

yarn安装第三方插件包,提示报错,yarn的镜像源已经过期了,因为yarn和npm用的是淘宝的镜像源,淘宝的镜像源已经过期了,要设置最新的淘宝镜像源。

淘宝最新镜像源切换_淘宝镜像-CSDN博客 查看yarn用的什么镜像源 yarn config get registry 查看具体的信息 yarn config list 设置淘宝的最新镜像源&#xff0c;yarn和npm都要设置最新的淘宝镜像源&#xff0c;不然还是报错 npm config set registry https://registry.npmm…

211毕业38岁产品经理被裁瞒着妻子送外卖

估计很多人都看到这个新闻了&#xff0c;微博和知乎也都上了热搜榜。 有人说&#xff0c;这一看就是摆拍的&#xff0c;谁没事在家里装个摄像头啊&#xff0c;这两人演技也还差点意思。 也有人说&#xff0c;大丈夫能屈能伸。虽然211毕业在互联网大厂工作&#xff0c;但是被裁了…

代码随想录算法训练营Day42|0-1背包理论基础、416. 分割等和子集

目录 0-1背包理论基础 0-1背包问题 二维dp数组01背包 算法实现 一维dp数组01背包 ​编辑算法实现 416. 分割等和子集 前言 思路 算法实现 总结 0-1背包理论基础 0-1背包问题 题目链接https://kamacoder.com/problempage.php?pid1046 有n件物品…

一、Kotlin 开发环境搭建

1. Kotlin 官网 https://kotlinlang.org/ 2. Kotlin 命令行工具下载 下载网址&#xff1a; https://github.com/JetBrains/kotlin/releases/tag/v1.3.50 切换其他版本&#xff0c;改下版本号即可 下载 kotlin-compiler-1.3.50.zip 文件即可 解压 kotlin-compiler-1.3.50.zip…