Pulsar幂等性开发的设计文档

news2025/1/12 20:38:32

PIP:
https://github.com/apache/pulsar/issues/19744

具体设计

  • 每个TC维护一个Map<ClientName,List> terminatedTxnMetaMap,维护每个客户端最新N个事务的状态,事务结束前,会把事务元数据写入这个List里,同时写入一个Compacted topic(persistent://pulsar/system/_terminated_txn_state${TCid})里持久化,Compacted topic的key为ClientName,从而能自动删除旧的事务元数据。

  • TC recovery时从Compacted topic里恢复维护的事务状态信息。

  • 可以考虑只保留aborted的事务状态,committed状态的事务不用保留,那么如果找不到事务元数据时,默认是committed的事务。
    为了避免用户不清楚pulsar能提供的能力,导致丢数时抱怨pulsar侧不抛错,这里还是选择同时保留committed、aborted的事务,如果在Preserver找不到事务元数据时仍然会抛出TransactionNotFound错误。

  • 为了避免某个clientName废弃了,导致对应的事务元数据一直不清除,开启一个定时任务来清除内存中过期的事务元数据。

  • __terminated_txn_state_${TCid}的创建利用topic自动创建机制,因此分区数、是否为分区topic由broker.conf对应的配置决定,这个跟__transaction_buffer的机制是相同的。为__terminated_txn_state_${TCid}开启compaction也是类似的机制,persistent://pulsar/system/__terminated_txn_state_${TCid}是系统topic,默认强制开启compaction机制。

  • TransactionMetadataPreserver的实现不需要进行并发控制,因为操作都是在单线程池里执行。replay方法在TransactionMetadataStoreService的单线程池里执行,append、flush、expireTransactionMetadata方法在MLTransactionMetadataStore里的单线程池里执行。

  • batch跟key机制不相容。batch的第一条消息的key被视为是整个batch消息的key,因此开启batch机制会让compaction机制执行错误。如果开启key based batching机制可以保证同一个batch里的key都相同,但是针对compaction机制而言,同一个key的消息只有最新一条消息是有效的,因此还不如通过代码逻辑去避免无效消息的发送。因此设计流程如下:
    变成aborting后修改内存数据,并记录将要发送消息的key;然后发送指令给tb,执行成功后会尝试变成aborted,但是变成aborted之前先清除事务元数据里的分区信息,然后遍历将要发送消息的key,同步发送到__terminated_txn_state。这样就可以压缩一下相同key的消息量了,而且避免了batch机制。

  • 如果代码逻辑链路如下:写日志条目,更新状态为Aborted -> 成功写入__terminated_txn_state -> 删除日志

则如果在 写日志条目,更新状态为Aborted -> 成功写入__terminated_txn_state 中间挂掉,recovey扫描日志时会直接把事务变成aborted,而不会写入__terminated_txn_state。因此要保证 成功写入__terminated_txn_state写日志条目 之前
因为成功写入__terminated_txn_state 在更新事务状态为ABORTED之前,因此写入的事务元数据是aborting,因此对于Preserver保存的事务数据,我们应当视aborting等同于aborted,因为aborting的事务最终只可能为aborted,而如果到了查询Preserver里的数据的逻辑,那此时肯定已经抛出TransactionNotFound错误,已经删除掉对应的事务元数据了,因此此时肯定是aborted了。

  • 因为一旦对某个事务调用append方法,后面就可能立刻flush到__terminated_txn_state里,因此要保证append到Preserver在内存更新状态为 aborting后面,不然的话topic里的事务元数据可能为OPEN状态,如果只存储aborted事务数据倒还好,无所属事务状态,如果把committed的事务数据也存储进来,那么就很有所谓了,因此为了向前兼容,这里也应该要保证append到Preserver内存更新状态为 aborting后面。

因此整条链路如下: OPEN状态 -> 把aborting log entry写入tc log -> 内存更新状态为 aborting -> append到Preserver -> 发送指令给TB、TP -> flush元数据到__terminated_txn_state -> 把aborted log entry写入tc log -> 内存更新状态为aborted
整条链路的每个步骤都是幂等的,因此能保证是幂等的。

  • 关注一下recover完成后的逻辑是否正确:由于 recover时不调用Preserver的逻辑 ,open状态不用分析,aborted也不用分析,因为已经走完流程了。那么分析一下aborting的事务, aborting的事务可能对TB发送过endTxn指令,可能写入元数据到__terminated_txn_state里,也可能都没有。 recoverTracker会对aborting事务重新abort,因此会重新触发上面两个步骤,因为它们也保证幂等的,因此能保证行为正确。

注记:recovery后的事务状态,如果为OPEN,则肯定没走到把aborting log entry写入tc log这一步;如果为ABORTING,则至少走完了把aborting log entry写入tc log,最多不会走到把aborted log entry写入tc log;如果为Aborted,则至少走完了把aborted log entry写入tc log

  • 序列化事务元数据时,其实只需要把事务ID号序列化即可,因此数据量是很小的,这里实现时顺便把事务状态也序列化了。至于其他成员数据量比较大,如分区信息,都不序列化。因此TransactionMetaPersistCount的上限,瓶颈不在于读写__terminated_txn_state,而在于维护太多事务元数据,对内存的占用。

  • 在要变成aborted之前,可以把分区信息给清除掉,降低内存占用。因为tc发送给tb的命令肯定执行成功了。删除分区信息后立马写入topic。写入成功才变成aborted。分区信息为空列表后,后面再次触发endTxnInTransactionBuffer会直接返回成功。

  • 实际保存的事务量:因为客户端在创建事务时,实际上是按round robin模式选TC的,比如说默认创建16个TC,则会轮询使用16个TC来服务它。如果TC Preserver设置TransactionMetaPersistCount为5,则实际上服务端侧会为客户端保存5*16=80个最近完成的事务元数据,如果单个客户端的事务吞吐量为5txn/s,那么TC Preserver仅能为客户端保存80/5=16s时间范围内的事务元数据。
    但是也会有异常情况,如果集群异常,最终只有一个TC能提供服务,那么就只能保存5个最近的事务状态了,当然这是极端异常的情况了。

  • 客户端侧不可避免地需要实现一套commit/abort失败后自动重试commit/abort的机制,虽然对于某些报错,如TransactionCoordinatorNotFound、TransactionPreserverClosed错误,客户端在接收到这两种错误类型时会自动重新发送请求给broker,但是对于其他报错类型是直接返回给用户的,如PulsarClientException.TimeoutException,超过一定时间请求没有响应客户端也会收到报错,这种情况下用户实际上是不知道事务的真实状态的,因此必须要重试commit。尽管把operation timeout调大可以规避这个报错,但是这会影响到客户端的其他行为,因此,客户端侧不可避免地需要实现一套重试commit/abort的机制。

  • 该模块依赖topic compaction,依靠topic compaction机制来读取同一个clientName的最新一份事务元数据,如果滚动重启时从__terminated_txn_state读数据时无法保证是最新的,那么就可能报TransactionNotFound错误。因此需要保证从__terminated_txn_state读取数据时是跟写入顺序一致的
    因为无论是RoundRobinPartition还是SinglePartition路由模式,只要消息带上了Key,则消息都会根据Key的hash值来选取分区,因此同一个client的事务元数据都会发往同一个分区,因此我们只需要保证消息发送是同步的即可。
    实验也证明了这一点:
    在这里插入图片描述

  • 目前TC log batch功能有问题,开启后会导致事务状态发生回滚,从而导致丢数,目前先禁用,对性能影响也不大。
    https://github.com/apache/pulsar/issues/20150

增加配置

服务端配置

   @FieldContext(
            category = CATEGORY_TRANSACTION,
            doc = "Max number of txnMeta of aborted transaction to persist in broker."
            + "If the number of aborted transaction is greater than this value, the oldest aborted transaction will be "
            + "removed from the cache and persisted in the store."
            + "default value is 0, disable persistence of aborted transaction."
    )
    private int TransactionMetaPersistCount = 0;

    @FieldContext(
            category = CATEGORY_TRANSACTION,
            doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver."
    )
    private long TransactionMetaPersistTimeInHour = 72;

    @FieldContext(
            category = CATEGORY_TRANSACTION,
            doc = "Interval in seconds to check the expired transaction in TransactionMetadataPreserver."
    )
    private long TransactionMetaExpireCheckIntervalInSecond = 300;
  • TransactionMetaPersistCount暂定为5,即整个集群正常情况下能为每个客户端保存5*16=80个已完成的事务元数据。
  • TransactionMetaPersistTimeInHour为72,即超过3天的事务元数据会被定时任务自动删除掉。
  • TransactionMetaExpireCheckIntervalInSecond为300,即检查过期事务元数据的定时任务每5min执行一次检查。
  • __terminated_txn_state 的分区数待定,根据集群的事务吞吐量来决定,而且这个可以动态进行扩分区。

客户端配置

   @ApiModelProperty(
            name = "clientName",
            value = "Client name that is used to save transaction metadata."
    )
    private String clientName;

修改传输协议

enum ServerError {
    ...
    TransactionPreserverClosed = 26; // Transaction metadata preserver is closed
}

message CommandNewTxn {
    required uint64 request_id = 1;
    optional uint64 txn_ttl_seconds = 2 [default = 0];
    optional uint64 tc_id = 3 [default = 0];
    optional string client_name = 4;
}

message CommandEndTxn {
    required uint64 request_id = 1;
    optional uint64 txnid_least_bits = 2 [default = 0];
    optional uint64 txnid_most_bits = 3 [default = 0];
    optional TxnAction txn_action = 4;
    optional string client_name = 5;
}

message TransactionMetadataEntry {
  ...
  optional string clientName = 12;
}

异常处理

注记:记当前分析的commit报错的事务对应的checkpoint为checkpoint n,要回滚的话,就回滚到 checkpoint n-1。

Pulsar侧commit失败时可能的错误类型:

  • TransactionNotFound:事务元数据已经被删除了,Pulsar侧不确定事务最终状态,可能committed,也可能是aborted。

    • 策略1:flink catch到该异常直接终止任务,此时pulsar侧人工介入排查,通过人工读取topic信息来获取事务真实状态,如果是committed就忽略;如果是aborted就从checkpoint n-1恢复重新执行即可。但是用户不一定能接受任务的终止,而且pulsar侧人工介入排查也需要耗费不短时间,因为需要扫描大量的数据,这会对用户方的任务造成较长的不可用时间,而且无法保证一定能找到该事务的数据。因此不采纳。
    • 策略2:仿照Kafka的行为,忽略该报错并打日志,或者再加一个监控,让用户知道有丢数风险。同样地这也是极端中的极端情况了,有了Pulsar幂等性功能不应该出现这种报错。
  • InvalidTxnStatusException:重新commit已经被超时aborted的事务,flink需要抛出该异常,从checkpoint n-1恢复重新执行即可保证一致性。
    注记:不可能重新abort已经被committed的事务,因为flink侧不会对一个事务调用一下commit,再调用一下abort。

  • TimeoutException:请求超时,这种错误重启Flink任务重新commit即可,或者不重启任务直接重试commit即可。

  • 其他错误类型:重启Flink任务重新commit即可,因为Pulsar滚动重启过程中commit可能产生各种报错,但是只要服务恢复正常重试commit就能成功。

测试

一致性测试

压测的场景是:只有事务Producer的场景,当然Flink+Pulsar也是只用到事务Producer。
下面结果是使用下面压测工具完成的。
https://github.com/apache/pulsar/pull/19781

配置:单个client,TransactionMetaPersistCount为1000,__terminated_txn_state分区数为1。
此时事务吞吐量为800txn/s。

那么可以分析一下,这个配置下能为客户端保存多长时间范围内的事务元数据?因为客户端在创建事务时,实际上是按round robin模式选TC的,比如说默认创建16个TC,则会轮询使用16个TC来服务它。如果TC Preserver设置TransactionMetaPersistCount为2,则实际上服务端侧会为客户端保存2*16个最近完成的事务元数据。

因此,当前配置会为客户端保存1000*16=16000条元数据,吞吐量为800txn/s,则能保存16000/800=20s的事务元数据。

测试情景结果
无干扰1e条无重复无丢失
滚动重启一次1e条无重复无丢失
宕机不可用时间5min1e条无丢失,重复一个事务的数据,即800条。排查发现原因是:该事务元数据找不到,即TransactionNotFoundException。因此保存20s的事务元数据还不够,需要保存更多数据才能避免报TransactionNotFoundException错误。

注记:因为宕机时间长达5min,为了模拟Flink的使用场景,这里事务超时时间设为10min;因为Pulsar客户端对发出去的请求还会有一个超时时间限制,即operation-timeout,默认为30s,超过这个时间请求没有响应也会报错,要避免这个报错可以把operation-timeout也调为10min,比宕机时间长即可,或者客户端有重试请求的机制,则也可以不设置该配置,遇到这个报错客户端重试commit即可。当前Flink commit遇到报错会重启任务重新commit,因此是可以不设置operation-timeout的。

下面把TransactionMetaPersistCount调大为2000,继续测试。
配置:单个client,TransactionMetaPersistCount为2000,__terminated_txn_state分区数为1。
但是事务吞吐量下降为500。(后续可以调大__terminated_txn_state分区数来优化,而且TransactionMetaPersistCount实际使用不能调这么大,也不能让用户搞这么高的事务吞吐量,单个客户端事务并发量高的话还可以调小compaction的时间间隔来减少recovery时读取的数据量,从而加快recovery,这里测试优先关注数据的一致性,暂时不考虑其他的)

根据上面信息可知,当前能保存2000*16/500=64s时间范围内的事务元数据。
在这里插入图片描述

测试情景结果
宕机不可用时间5min1e条无重复无丢失
滚动重启10次10e条无重复无丢失

以上实验重复多次,均通过一致性检验,无重复、无丢失。

性能测试

两步拆分的压缩效果

分成两步的消息压缩效果如下:

总共完成了125000个事务。

2023-04-18T12:18:50,999+0800 [Thread-0] INFO  org.apache.pulsar.testclient.PerformanceProducer - --- Transaction : 125000 transaction end successfully -      -- 2 transaction end failed --- 125002 transaction open successfully --- 5 transaction open failed --- 0.001 Txn/s
cat logs/pulsar-broker-sg-testhdfs-dn6.bigdata.bigo.inner.out.1 | grep -a "No need to flush transaction" | wc -l

在这里插入图片描述
在这里插入图片描述
把49.07%的消息量给压缩掉了。

多次测试下来,能把25%~60%的消息给压缩掉,当然这是在客户端高事务并发量的情况下的结果,低并发的情况下压缩效果应该会差很多。

内存占用分析

在这里插入图片描述
一个事务元数据大概按400byte来算。

估算一下的话,如果每个TC为client保存5个事务元数据,那么10w个客户端就会消耗190M,即每个TC需要消耗190M内存。
在这里插入图片描述
但是整个集群有16个TC,因为有16个TC轮询给客户端提供服务,因此实际上整个集群会为一个客户端保存5*16=80个事务元数据。

那么整个集群需要消耗190M*16=3G的内存。
在这里插入图片描述
注记:因为一个broker是可能加载多个TC的,因此可能出现极端情况:16个TC都加载到同一个broker上了,那么内存消耗全部由一个broker来承担,因此就会有OOM的风险,因此有必要在负载均衡策略上对于TC的分配上尽量均衡。当前是根据hash进行随机分配的,因此目前没有影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/457232.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高分辨率光学遥感图像水体分类综述2022.03

本文是Water body classification from high-resolution optical remote sensing imagery: Achievements and perspectives的学习笔记。 相关资源被作者整理到&#xff1a;这里 文章目录 Introduction基本知识 挑战和机遇挑战1. 有限的光谱信息和小场景覆盖2. 形状、大小和分布…

开放原子训练营(第三季)RT-Thread Nano学习营北京站

开放原子训练营&#xff08;第三季&#xff09;RT-Thread Nano学习营北京站学习心得 文章目录 开放原子训练营&#xff08;第三季&#xff09;RT-Thread Nano学习营北京站学习心得RT-Thread简介会议议程介绍RT-Thread Nano介绍RT-Thread Nano实操训练总结 RT-Thread简介 RT-Th…

【网络安全】XXE--XML外部实体注入

XXE XXE定义XML初识菜鸟xml概念初识DTD解答疑虑1&#xff1a;&#xff01;DOCTYPE是干什么用的疑虑2&#xff1a;&#xff01;ELEMENT是干什么用的疑虑3&#xff1a;#PCDATA是干什么用的疑虑4&#xff1a;为什么元素要再次声明类型 内部实体和外部实体的区别内部实体外部实体通…

【Spring篇】DI相关内容

&#x1f353;系列专栏:Spring系列专栏 &#x1f349;个人主页:个人主页 目录 一、setter注入 1.环境准备 2.注入引用数据类型 3.注入简单数据类型 二、构造器注入 1.环境准备 2.构造器注入引用数据类型 3.构造器注入多个引用数据类型 4.构造器注入多个简单数据类型 …

OAuth2学习(实操OAuth微信登录)

文章目录 前言1 OAuth2基本概念2 网站应用微信登录2.1 大概流程2.2 前期准备2.3 将微信登录二维码内嵌到自己页面2.3.1 后端接口编写(向前端提供参数)2.3.2 前端显示二维码页面 2.4 编写回调接口2.4.1 回调接口根据code获取access_token 这个令牌2.4.2 回调接口根据access_toke…

大数据分析工具Power BI(十三):制作占比分析图表

文章目录 制作占比分析图表 一、饼图 二、环形图 三、树状图

赛题解析 | kaggle百万奖金新赛--图书墨水检测大赛

整理自kaggle平台 比赛题目 Vesuvius Challenge - Ink Detection kaggle-图书墨水检测 比赛背景 赫库兰尼姆卷轴中使用的墨水在X射线扫描中不容易显示出来。但我们发现机器学习模型可以检测到它。幸运的是&#xff0c;我们有地面实况数据。自从近300年前发现赫库兰尼姆Papyr…

【K8S系列】深入解析Service

序言 Dont count the days. Make the days count 不要数着日子。让日子过得有意义 文章标记颜色说明&#xff1a; 黄色&#xff1a;重要标题红色&#xff1a;用来标记结论绿色&#xff1a;用来标记一级论点蓝色&#xff1a;用来标记二级论点 Kubernetes (k8s) 是一个容器编排平…

React 组件 API

常用 React 组件 API&#xff1a; 设置状态&#xff1a;setState替换状态&#xff1a;replaceState设置属性&#xff1a;setProps替换属性&#xff1a;replaceProps强制更新&#xff1a;forceUpdate获取DOM节点&#xff1a;findDOMNode判断组件挂载状态&#xff1a;isMounted …

基础自动化测试脚本开发——Loadrunner网页系统两种创建关联函数的方法详解

前言 许多的系统都采用SessionID或SeqID等方法来标识不同的任务和数据报&#xff0c;应用在每次运行时发送的数据并不完全相同。所以&#xff0c;为了让脚本能够支持测试的需求&#xff0c;就必然要用某种机制对脚本的数据进行关联了。总之一句话&#xff1a;通过关联可以在测试…

AOD实践,modis数据下载,modis数据处理

modis数据下载-数据读取-重投影-拼接-均值 一、数据下载 1、Cygwin安装 Cygwin安装教程&#xff1a;https://blog.csdn.net/u010356768/article/details/90756742 1.2 数据采集 现提供遥感数据下载服务&#xff0c;主要是NASA数据&#xff0c;数据下载网站包括&#xff1a…

炸裂的 AutoGPT,鱼皮教你免费用!

大家好&#xff0c;我是鱼皮&#xff0c;继前段时间爆火的 ChatGPT 后&#xff0c;又一个炸裂的开源项目 Auto-GPT 出现了。 仅在最近 10 天&#xff0c;这个项目就收获了 8 万多个 star&#xff0c;目前总 star 数超过 10 万&#xff01; 那 Auto-GPT 到底是个什么玩意&#x…

一文让你熟练使用 JSONObject 和 JSONArray

依赖 导入阿里的 fastjson 依赖。 <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency>类型转换 String 与 JSON 相互转换 通过 JSONObject.parseObject…

21、越狱调试

前言 调试一款应用,使用重签名方案,很容易被第三方察觉.在越狱环境中,我们可以在不污染App的情况下,对第三方程序进行动态调试 一、Reveal Reveal 是一款UI调试工具,对iOS逆向开发非常有帮助. 在Mac电脑中,安装Reveal软件: 密码 xclient.info 在手机中,安装Reveal插件打开C…

【京东】商品评价数据采集+买家评论数据+卖家评论数据采集+行业数据分析+行业数据质检分析

采集场景 京东商品详情页中的评价&#xff0c;有多个分类&#xff1a;【全部评价】、【晒图】、【视频晒单】、【追评】、【好评】、【中评】、【差评】。其中【全部评价】默认展现&#xff0c;其他需点击后展现。本文以按【差评】筛选采集为例讲解。实例网址&#xff1a;http…

MySQL-----复合查询

文章目录 前言一、基本查询回顾二、 多表查询解决多表查询的思路 三、自连接四、子查询1. 单行子查询2. 多行子查询3. 多列子查询4. 在from子句中使用子查询5. 合并查询5.1 union5.2 unoin all 总结 前言 前面的学习中,对于mysql表的查询都是对一张表进行查询,在实际开发中这远…

快速入门git(收藏篇)

大致总结&#xff1a; 本地仓库要先去github注册&#xff0c;并通过github的验证。于是本地仓库的文件均可通过协议传输至github任意一个仓库。本地文件要先传到本地仓库&#xff0c;由本地仓库传输至远程github仓库。 在详细学习git之前&#xff0c;我们先来看看Git和svn之间…

系统集成项目管理工程师 笔记(第七章:项目范围管理)

文章目录 7.1.1 项目范围管理的含义及作用7.1.2 项目范围管理的主要过程&#xff08;6个&#xff09; 7.2 编制范围管理计划和范围说明书 2687.2.1 编制范围管理计划过程所用的工具与技术7.2.2 编制范围管理计划过程的输入、输出 7.3 收集需求 2717.3.1 收集需求过程的工具与技…

java 网络编程总结

目录 一、拾枝杂谈 1.网络通信 : 2.网络 : 3.IP : 4.IPv4的ip地址分类 : 5.域名和端口 : 6.网络协议 : 二、网络编程 1.InetAddress : 1 常用方法 : 2 代码演示 : 2.Socket : 1 概述 : 2 代码演示 : eg1 : 客户端连接服务端 eg2 : 结束标记 eg3 : 网络传输文件…

Linux设置进程自启动

systemd学习 http://www.jinbuguo.com/systemd/systemctl.html https://blog.csdn.net/sinat_35815559/article/details/102867290 常用命令 立即启动一个服务&#xff1a; systemctl start xxx.service立即停止一个服务&#xff1a; systemctl stop xxx.service重启一个服…