FlinkKafkaProducer 源码分析

news2024/11/20 0:26:47

initializeState

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
先查询是否开启isCheckpointingEnabled配置,如果没开,但是使用了EXACTLY_ONCE或者AT_LEAST_ONCE语义,就报错。

在这里插入图片描述
在这里插入图片描述
然后从checkpoint中保存的state中读取nextTransactionalIdHintState。
在这里插入图片描述
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR现在使用v2版本,如果checkpoint中保存的是v1版本的,则调用migrateNextTransactionalIdHindState方法进行迁移。

然后读取taskName,taskName有最大字数限制,如果大于maxTaskNameSize则截断。
在这里插入图片描述
在这里插入图片描述

维护事务ID池子

然后初始化TransactionalIdsGenerator。
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
TransactionalIdsGenerator保证了不同子任务之间使用和abort的事务id号不会重复,互相之间没有干扰。
并发子任务运行在同一个operator上,prefix为taskName+OperatorUniqueID,因此多个并发子任务的prefix相同。
因此入参还传入了参数subtaskIndex、totalNumberOfSubtasks,来区分不同的子任务

在这里插入图片描述
在这里插入图片描述
前面文章
Flink+Pulsar、Kafka问题分析及方案 – 幂等性
分析过,**Kafka因为一个事务ID只能支持对最后一个事务的commit幂等性,那么当flink出现并发checkpoint的情况时就有可能出错,**因此kafka使用了一个事务ID池子,只要并发checkpoint的个数小于事务ID池子的大小,那么就不会出错。
事务ID池子的大小默认为DEFAULT_KAFKA_PRODUCERS_POOL_SIZE,即5个。

每个子任务使用的事务ID池子是固定的,会把事务ID池子保存到userContext,userContext保存在state里,从而作为快照的一部分存储起来,当从checkpoint/savepoint启动时,就会读取前面保存的事务ID池子。

  • 如果从checkpoint/savepoint中启动
    org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction#initializeState
    在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述
    从checkpoint中恢复userContext后,调用finishRecoveringContext方法,从userContext恢复事务ID池子,设置进availableTransactionalIds。

  • 如果不从checkpoint/savepoint中启动,则没有恢复userContext,会调用initializeUserContext来初始化userContext。
    在这里插入图片描述在这里插入图片描述在这里插入图片描述
    KafkaTransactionContext就只保存了事务ID池子。
    在这里插入图片描述
    调用transactionalIdsGenerator.generateIdsToUse来生成该任务使用的事务ID池子,需要传入一个入参nextTransactionalIdHint.nextFreeTransactionalId,它是用来帮助确定生成安全的事务ID号的,后续会进一步分析它。
    在这里插入图片描述在这里插入图片描述
    所有并发子任务的id池子为[nextFreeTransactionalId,nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
    单独某个子任务的id池子为[nextFreeTransactionalId + subtaskIndex * poolSize, nextFreeTransactionalId + (subtaskIndex+1) * poolSize)

维护nextTransactionalIdHint

下面看nextTransactionalIdHint是怎么维护的。
首先要对比分析一下TwoPhaseCommitSinkFunction#state
在这里插入图片描述在这里插入图片描述
注记:这行代码除了从checkpoint中获取operator state,还起到注册该operator state的作用。
在这里插入图片描述
state状态的redistribution scheme是round-robin pattern,比如说有10个子任务,则一个checkpoint中会保存10个state,如果并发度降低为5,则这10个state会以round-robin模式分配给这5个子任务。
这也是为什么TwoPhaseCommitSinkFunction#initializeState方法里要对state用for循环遍历,因为启动实例的时候同一个子任务可能分配到了多个state。

在这里插入图片描述
nextTransactionalIdHintState与前面的state成员不一样,它注册的state的redistribution scheme是broadcast pattern,即一个checkpoint里包含的该state会分发给所有子任务,所有子任务读取到相同的state内容。
在这里插入图片描述
在这里插入图片描述在这里插入图片描述

继续看FlinkKafkaProducer#initializeState方法。
在这里插入图片描述
如果不从checkpoint启动,则这里transactionalIdHints的size为0,如果从checkpoint启动,则size应该为1,大于1是不合法的。
因此,

  • 如果size为1,则直接设置进nextTransactionalIdHint变量即可。

  • 如果size为0,则说明不是从checkpoint启动,则可能有下面一些情况:

    • 第一次启动该任务,
    • 前面启动过任务,但是从来没成功打过checkpoint
    • 用户启动任务执行一些测试,后面正式生产时不从checkpoint启动

除了第一种情况,其他情况都可能会残留一些open的事务,因此需要abort掉。
abort哪些事务也是一个值得关注的问题,这个后续再分析。

nextTransactionalIdHintState作为一个新增的operator状态,同样在snapshotState方法中保存进Checkpoint里。

snapshotState

在这里插入图片描述
先调用TwoPhaseCommitSinkFunction#snapshotState把state成员给保存下来。
但是Kakfa connector还有一个状态nextTransactionalIdHintState。
因此接下来的代码把nextTransactionalIdHintState给更新,并保存到状态后端里。
在这里插入图片描述
为了避免重复保存nextTransactionalIdHintState变量(因为它的redistribution scheme是broadcast pattern),只由第一个子任务(即index为0)来维护与保存即可。

注意:不从checkpoint启动时,nextTransactionalIdHint是初始化为
在这里插入图片描述
因此生成id池子时传入nextFreeTransactionalId为0。
在这里插入图片描述
因此所有子任务生成的ID范围为[0,parallelism * kafkaProducersPoolSize)

当任务开启打第一个checkpoint时,这里判断

getRuntimeContext().getNumberOfParallelSubtasks()
                    > nextTransactionalIdHint.lastParallelism

肯定为true,因此更新nextFreeTransactionalId为parallelism * kafkaProducersPoolSize,这就是一个安全的事务ID起点。当前使用的事务ID号都比它小。

Flink任务停止后,再次启动时是可以修改子任务并发度的,增加或者减小都有可能,因此从checkpoint恢复时,如果并发度增加为parallelism2,新的子任务使用的ID号就会超过nextFreeTransactionalId,所有子任务使用的ID范围为[0,parallelism2 * kafkaProducersPoolSize)
因此需要更新nextFreeTransactionalId为parallelism2 * kafkaProducersPoolSize
然后保存进nextTransactionalIdHintState里。

根据对代码的分析,nextTransactionalIdHint.nextFreeTransactionalId只会在下面调用链中使用到:generateNewTransactionalIds -> initializeUserContext
在这里插入图片描述
而由于Kafka connector会把ID池子保存进userContext里,进而保存进checkpoint里,因此这块代码只有在读取不到userContex时才会调用到。

而读取不到userContex有下面情况:

  • 大多数情况下不从checkpoint/savepoint中启动才会读取不到userContex,不从checkpoint/savepoint中启动,则nextTransactionalIdHint.nextFreeTransactionalId肯定为0,即Flink并发子任务生成的事务ID是使用
    TransactionalIdsGenerator#generateIdsToUse(0)
    来生成的。
  • 如果增加并发度,新增的某些子任务会读取不到state、userContext,因此会调用generateNewTransactionalIds -> initializeUserContext,但是因为nextTransactionalIdHint这个operator状态是广播分发的,因此nextTransactionalIdHint.nextFreeTransactionalId不为0
    要验证这个事情可以如下实验:
    先并发度为1,则[0,5)
    然后并发度为2,检测多出来的一个子任务使用的事务ID范围是否为[10,15),而不是[5,10)。

如果证实了,则就可能有bug!
因为后面的generateIdsToAbort方法是基于nextFreeTransactionalId为0的假设!
如果新增的子任务创建的事务ID超过了generateIdsToAbort方法预估事务ID的上限,则会导致OPEN事务的残留。
其实nextFreeTransactionalId的本身貌似没有用处?

Abort残留的事务

不从checkpoint/savepoint中启动

下面分析前面提到的,不从checkpoint/savepoint中启动时,需要调用abort方法来把残留的事务给终结掉。
在这里插入图片描述
这里有一个问题需要解决的:由于不从checkpoint/savepoint中启动,因此我们无法得知之前启动时的并发度,假设前面启动时的并发度为P1,当前启动的并发度为P2,因此前面的任务执行使用的事务ID范围为[0,P1 * PoolSize),我们需要把这些事务ID都abort一次,但是P1是不可知的。

  • 如果P2大于P1,即增加并发度,则[0,P2 * PoolSize)肯定包含[0,P1 * PoolSize),此时对[0,P2 * PoolSize) 遍历abort一次即可。
  • 如果P1大于P2,即降低并发度,则[0,P2 * PoolSize)是[0,P1 * PoolSize)的子集,此时无法猜测要abort多少事务ID。
    在这里插入图片描述
    Kakfa的解决方法是,设定一个参数safeScaleDownFactor,即Flink任务减低并发度的比例不能超过这个,默认值为5。
    比如说,第一次启动Flink任务的并发度为10,则第二次启动Flink任务的并发度至少为10/5=2。
    在这里插入图片描述
    通过这种方式,我们得知P1、P2的关系:P2*5>=P1
    [0,P2 * 5 * PoolSize)肯定包含[0,P1 * PoolSize)
    因此,我们对[0,P2 * 5 * PoolSize)范围内的事务ID号都abort一次即可。
    注意这里用到了一个假设:生成事务ID时所用的入参nextFreeTransactionalId是0.

另外,由于Pulsar、Kafka的topic分区数都只能增加,不能减小;而Flink任务的并发度往往跟分区数相关,因此Flink任务的并发度调整大多只会增加,而不会减小,就算减小也不会减小很多,因此,Kafka connector这种做法其实也还行。

在这里插入图片描述
另外,abortTransactions事务的方式不是发送abort请求,而是调用
kafkaProducer.initTransactions();
来初始化该事务ID对应的Producer,此时服务端会关闭处于正在进行但还未进行提交的事务,同时服务端会对epoch进行递增。

从checkpoint/savepoint中启动

除了“不从checkpoint/savepoint中启动”这个case需要abort掉残留的事务,Flink从Checkpoint/Savepoint启动时也有可能残留open的事务
如果Flink任务成功执行了initializeState方法,即成功创建了新事务,但是在成功执行snapshotState前就失败挂掉了,则这个新创建的事务也是无法记录到Checkpoint/Savepoint里的,因此也会导致遗漏某些事务没有被完结。
在这里插入图片描述
这种case也是很常见的。

下面看看Kafka connector是怎么处理的。
TwoPhaseCommitSinkFunction#initializeState ->
在这里插入图片描述
FlinkKafkaProducer#finishRecoveringContext ->
在这里插入图片描述
FlinkKafkaProducer#cleanUpUserContext
在这里插入图片描述
从Checkpoint/Savepoint启动,TwoPhaseCommitSinkFunction#initializeState方法会对checkpoint中存储的未完成的事务ID(即pendingTransaction、pendingCommitTransactions)调用recoverAndCommit、recoverAndAbort,然后把这些事务ID加入到handledTransactions里。
在这里插入图片描述
但是根据前面分析,可能有一些未完结的事务并没有存储进快照里,即pendingTransaction、pendingCommitTransactions里面。
因此我们还需要补充abort那些不在快照里的事务ID,因为Kafka connector的事务ID都是固定的,都在池子里存储着,而Context里存储了事务ID池子,因此恢复userContext后,即从userContext读取出来事务ID池子后,刨除掉handledTransactions的那一部分事务ID,剩下的事务ID都abort一次即可。

因此会调用finishRecoveringContext -> cleanUpUserContext方法,把剩下的事务ID都abort一次。
最后finishRecoveringContext方法会把读取出来的事务ID池子设置进当前任务使用的事务ID池子。

recoverAndCommit、recoverAndAbort

这部分在Flink+Pulsar、Kafka问题分析及方案 – 幂等性中也分析过,不过那里是以Kafka、Pulsar对比的角度来分析的,这里再以Flink Kafka Connector的源码分析角度来进行介绍。
在这里插入图片描述
producer = initTransactionalProducer(transaction.transactionalId, false);
就是创建一个简单的Producer。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
注记:这里构造返回的是FlinkKafkaInternalProducer,它封装了KafkaProducer,并且提供了resumeTransaction等有用的方法。

注意:这里创建完事务Producer后,并没有按照Kafka事务的官方规范的使用方式,即创建完事务Producer后,立马调用producer.initTransactions(),如下例:

KafkaProducer producer = createKafkaProducer(
  “bootstrap.servers”, “localhost:9092”,
  “transactional.id”, “my-transactional-id”);

// 初始化事务,这里会向 TC 服务申请 producer id
producer.initTransactions();

这是因为:如果调用initTransactions方法,会立马把当前事务ID对应的事务都结束掉,然后把epoch值递增1,从而把旧的Producer给fence掉,这个是Kafka为了解决僵尸实例而设计的一个机制。
但是Flink必须要对旧的事务进行commit、abort,因此需要做一些trick,调用FlinkKafkaInternalProducer#resumeTransaction方法。
producer.resumeTransaction(transaction.producerId, transaction.epoch);
在这里插入图片描述
在这里插入图片描述
通过反射的方式,将事务Producer的producerId、epoch设置好,并将事务的本地状态转换为IN_TRANSACTION,从而能发送commit、abort请求。

最后再调用producer.commitTransaction()来发送commit请求。

在这里插入图片描述
然后是异常处理了,根据Flink+Pulsar、Kafka问题分析及方案 – 幂等性的分析可知,InvalidTxnStateException、ProducerFencedException这两种异常类型直接catch掉不报错即可。
最后把这个Producer关闭掉,因为现在旧的事务都已经处理完了,可以把这个旧的Producer给close掉,后续开启新的事务Producer。

在这里插入图片描述
recoverAndAbort方法则直接调用initTransactions方法来把旧事务给abort掉,比较简单。

beginTransaction、Commit、Abort

beginTransaction

在这里插入图片描述
可以看到,每次创建新事务时,都会创建一个新的Producer来工作。因此后面每次abort、commit完成时,也会把对应的Producer给close掉。
在这里插入图片描述
这里创建事务Producer时,就是走的官方流程,创建成功后会调用producer.initTransactions()。
事务ID要从池子availableTransactionalIds里面取,如果取不到,则直接抛错,因为并发checkpoint的个数超过池子事务ID的个数时,会发生冲突。

commit

在这里插入图片描述
在这里插入图片描述
commit完成后,就把该Producer对应的事务ID回收到池子availableTransactionalIds里。
因为每次创建新事务的时候,都是创建新的Producer来工作,因此这里每次abort、commit完成时,也会把对应的Producer给close掉。

abort

类似。
在这里插入图片描述

open、close、invoke

open方法

在创建初始化实例时会调用open方法。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
logFailuresOnly变量为true时,遇到写入报错时只记录错误日志,不抛错;为false时,则把报错保存到asyncException成员里,checkErroneous方法会检查asyncException成员是否有内容,如果有则抛出来。
在这里插入图片描述
因此,open方法先根据logFailuresOnly配置设置好callback成员。

invoke方法

invoke方法在发送消息时,把callback成员设置成发送消息的回调。
在这里插入图片描述

close方法

在这里插入图片描述
close方法先把currentTransaction的内容给flush一下,最后把pendingCommitTransactions、currentTransactionHolder对应的Producer都关闭掉,这里把代码放在finally代码块里,是为了避免前面抛错导致没有执行close。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541299.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

表情、特殊字符、字符串截取

码元与码点 关于码元和和码点,通过一个例子进行介绍。 如图,字符串😊只有一个“笑脸”符号,但是通过length属性发现,“长度”为2,string.length到底表示什么? 答:码元的个数 什么是…

C语言爬取HTML-爬取壁纸 文末附源码

前言:这学期计算机软件课程设计的其中一个题目是使用C语言爬取HTML,本打算使用C语言的CSpidr库来实现,但是因为它的依赖liburi没有找到在哪里安装,所以放弃了这个想法,使用的是curl以及libxml2这两个库,能够…

这几款实用且有趣的软件不容错过

软件一:天若ocr 这款Windows平台的天若OCR文字识别工具一定更适合你。 软件作者来自天若游心,我爱破解。 发布一年半以来一直深受好评,更新速度也非常快。 最近,它增加了批次识别功能。 软件二:腾讯柠檬精简版 除了Q…

P2233 [HNOI2002]公交车路线

题目描述 在长沙城新建的环城公路上一共有 8 个公交站,分别为 A、B、C、D、E、F、G、H。公共汽车只能够在相邻的两个公交站之间运行,因此你从某一个公交站到另外一个公交站往往要换几次车,例如从公交站 A 到公交站 D,你就至少需要…

keycloak介绍与使用示例,超时时间设置

keycloak介绍 Keycloak是一款由Red Hat开源社区开发的开放源代码的身份和访问管理解决方案,它提供了安全的单点登录(SSO)、多因素身份验证、社交登录和基于角色的访问控制等功能Keycloak基于OAuth 2.0和OpenID Connect协议,并支持SAML 2.0,可…

spring Bean的循环依赖问题

public class Husband {private String name;private Wife wife;public void setName(String name) {this.name name;}public String getName() {return name;}public void setWife(Wife wife) {this.wife wife;}// toString()方法重写时需要注意:不能直接输出wif…

【计算机网络基础】章节测试2 物理层

文章目录 判断题选择题辨析题应用题 判断题 现在的无线局域网常用的频段是2.8GHz和5.4GHz。 多模光纤只适合于近距离传输。√ 数据在计算机内部多采用串行传输方式,但在通信线路上多采用并行传输方式。 统计时分复用可以按需动态分配时隙。√ 相对于同步时分复用…

卷麻了,新来的00后实在是太卷了...

在程序员职场上,什么样的人最让人反感呢? 是技术不好的人吗?并不是。技术不好的同事,我们可以帮他。 是技术太强的人吗?也不是。技术很强的同事,可遇不可求,向他学习还来不及呢。 真正让人反感的,是技术平平&…

geoserver切片数据本地缓存和层级配置

很多业务场景中,我们会用到图层切片功能,默认情况下,每次调用都是新的重新切片,这样在性能上存在一定问题;基于此我们可以进行本地缓存切片,及此地理位置只进行一次切片处理,数据缓存在本地磁盘…

基于Python实现线性分类器

访问【WRITE-BUG数字空间】_[内附完整源码和文档] 在机器学习领域,分类的目标是指将具有相似特征的对象聚集。而一个线性分类器则透过特征的线性组合来做出分类决定,以达到此种目的。对象的特征通常被描述为特征值,而在向量中则描述为特征向…

Visual studio 配置intel realsense sdk环境

前面的部分已经有博主写过了,这里就不赘述了,附上链接:VS配置使用realsense相机SDK 仅仅配置上述文章中的部分,运行realsense example的部分例子程序时会出现找不到glfw3.h的问题。我查找了github上的提问发现这个问题原因是没有…

网络安全中NISP二级证书和CISP证书的优势有什么

优势?那就需要多个点展开说说啦~ 1.权威含金量高 我国信息安全领域唯一的国家级注册安全专业人员证书 2.就业面宽,企业优先选择证书持有者 通过专业培训和考试提高个人信息安全从业水平,证明具备从事信息安全技术和管理工作的能力&#x…

opencv二值化详解

大家好,今天来跟大家讲讲 opencv二值化。 先从一个比较经典的方法开始讲解,看 opencv官方文档: 二值化(binary)的定义:在一个输入图像中,将其一个像素点设置为0,将其两个像素点设置为1。 二值化…

开发笔记之:文件读取值溢出bug分析(QT C++版)

(1)引言 以下是QT C读取数据文件(QDataStream)的代码: /*** 按双字读取* param fis 文件输入流* param isBigEndian 是否大头(字节序)* return 双字值*/ DWORD FsFileUtil::readAsD…

怎么用问卷工具做市场调研?

对于希望开发新产品或服务、拓展新市场或确定潜在客户的公司来说,市场调查是一个至关重要的过程。然而,进行市场调查可能既耗时又昂贵,特别是在涉及对大量人群进行调查的情况下。今天,小编将来聊一聊调查问卷工具如何帮助企业进行…

微信小程序-基础知识

文章目录 AppIdOpenIDUnionId处理方法session_key AppId appid 是微信账号的唯一标识,这个是固定不变的; 如果了解微信公众号开发的就需要注意一下,小程序的appid 和 公众号的appid 是不一致的 OpenID 为了识别用户,每个用户针…

如何成为自动化测试工程师?8年测试总结,自动化测试岗晋升的技能...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 Python自动化测试&…

程序设计入门——C语言2023年5月18日

程序设计入门——C语言 第二周 计算表达式 课程来源:链接: 浙江大学 翁恺 程序设计入门——C语言 学习日期:2023年5月18日 第二周 计算 表达式 有两个变量a和b,交换a和b的值。 老师举例:有两杯液体,一杯茶&#xff…

03)FastDFS配置nginx 服务,使用http方式访问图片

FastDFS是没有文件访问功能的,需要借助其他工具实现图片HTTP访问的。 没安装nginx时比如前端html网页想获取 FastDFS的一张图片显示,需要java写个controller,然后使用 FastDFS-java client客户端调用文件获取api,HttpServletResponre在返回图片流.给前端显示。 安装了nginx…

HTTP介绍、原理

HTTP 与 HTTPS 有哪些区别? HTTP 是超文本传输协议,信息是明文传输,存在安全风险的问题。HTTPS 则解决 HTTP 不安全的缺陷,在 TCP 和 HTTP 网络层之间加入了 SSL/TLS 安全协议,使得报文能够加密传输。HTTP 连接建立相…