RocketMQ 重试机制详解及最佳实践

news2024/9/23 11:24:36

作者:斜阳

引言

本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践

RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。

在这里插入图片描述

生产者发送重试

RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢?

1. 生产者重试次数

RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。
  • 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。

2. 生产者重试间隔

在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略:

非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔

流控错误场景:系统会按照预设的指数退避策略进行延迟重试

  • 为什么要引入退避和随机抖动?

如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connection-backoff.md。

INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒;
MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6;
JITTER :随机抖动因子,默认值:0.2;
MAX_BACKOFF :等待间隔时间上限,默认值:120 秒;
MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。

ConnectWithBackoff()
  current_backoff = INITIAL_BACKOFF
  current_deadline = now() + INITIAL_BACKOFF
  while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS)
    SleepUntil(current_deadline)
    current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)
    current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)

特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。

3. 重试带来的副作用

不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大

  • 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息

  • 较多的重试次数也会增大服务端的处理压力

4. 用户的最佳实践是什么

1)合理设置发送超时时间,发送的最大次数

发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。

2)如何保证发送消息不丢失

由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种:

    1. 向调用方返回业务处理失败;
    2. 尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。

3)关注流控异常导致无法重试

触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。

4)早期版本客户端如何使用故障延迟机制进行发送重试?

对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用:

producer.setSendLatencyFaultEnable(true)

配置重试次数使用:

producer.setRetryTimesWhenSendFailed()
producer.setRetryTimesWhenSendAsyncFailed()

消费者消费重试

消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢?

RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题:

  • 如何保证业务完整处理消息?

消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。

  • 业务应用异常时处理中的消息状态如何恢复?

当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。

1. 什么是消费重试?

  • 什么时候认为消费失败?
    消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败

  • 消费重试是什么?
    消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;

  • 重试过程状态机:消息在重试流程中的状态和变化逻辑;

  • 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;

  • 最大重试次数:消息可被重试消费的最大次数。

2. 消息重试的场景

需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。

推荐使用场景:

  • 业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功;
  • 是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。

正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。

错误使用场景:

  • 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。

反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。

  • 消费处理中使用消费失败来做处理速率限流,是不合理的。
    限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。
    这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面:
    • RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力;
    • 重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。

3. 不要以重试替代限流

上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。

在这里插入图片描述

如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。

RateLimiter rateLimiter = RateLimiter.create(50);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // 设置订阅组名称
    .setConsumerGroup(consumerGroup)
    // 设置订阅的过滤器
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
        // 阻塞直到获得一个令牌,也可以配置一个超时时间
        rateLimiter.acquire();
        LOGGER.info("Consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();

4. PushConsumer 消费重试策略

PushConsumer 消费消息时,消息的几个主要状态如下:

在这里插入图片描述

  • Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费;

  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态;

  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机;

  • DLQ:死信状态
    消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。
    该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

  • 最大重试次数

PushConsumer 的最大重试次数由创建时决定。

例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。

  • 重试间隔时间
  • 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:

说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。

在这里插入图片描述

  • 顺序消息:重试间隔为固定时间,默认为 3 秒。

5. SimpleConsumer 消费重试策略

和 PushConsumer 消费重试策略不同,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。

在这里插入图片描述

由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,可以通过 API 接口修改不可见时间。

例如,预设消息处理耗时最多 20 ms,但实际业务中 20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。

修改消息不可见时间需要满足以下条件:

  • 消息处理未超时
  • 消息处理未提交消费状态

如下图所示,消息不可见时间修改后立即生效,即从调用 API 时刻开始,重新计算消息不可见时间。

在这里插入图片描述

  • 最大重试次数

与 PushConsumer 相同。

  • 消息重试间隔

消息重试间隔 = 不可见时间 - 消息实际处理时长

例如:消息不可见时间为 30 ms,实际消息处理用了 10 ms 就返回失败响应,则距下次消息重试还需要 20 ms,此时的消息重试间隔即为 20 ms;若直到 30 ms 消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为 0 ms。

SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。

//消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。
ClientServiceProvider provider1 = ClientServiceProvider.loadService();
String topic1 = "Your Topic";
FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);

SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder()
        //设置消费者分组。
        .setConsumerGroup("Your ConsumerGroup")
        //设置接入点。
        .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
        //设置预绑定的订阅关系。
        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
        .build();
List<MessageView> messageViewList = null;
try {
    //SimpleConsumer需要主动获取消息,并处理。
    messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        //消费处理完成后,需要主动调用ACK提交消费结果。
        //没有ack会被认为消费失败
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            e.printStackTrace();
        }
    });
} catch (ClientException e) {
    //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
    e.printStackTrace();
}
  • 修改消息的不可见时间

案例:某产品使用消息队列来发送解耦“视频渲染”的业务逻辑,发送方发送任务编号,消费方收到编号后处理任务。由于消费方的业务逻辑耗时较长,消费者重新消费到同一个任务时,该任务未完成,只能返回消费失败。在这种全新的 API 下,用户可以调用可以通过修改不可见时间给消息续期,实现对单条消息状态的精确控制。

simpleConsumer.changeInvisibleDuration();
simpleConsumer.changeInvisibleDurationAsync();

6. 功能约束与最佳实践

  • 设置消费的最大超时时间和次数

尽快明确的向服务端返回成功或失败,不要以超时(有时是异常抛出)代替消费失败。

  • 不要用重试机制来进行业务限流

错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。

正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。

  • 发送重试和消费重试会导致相同的消息重复消费,消费方应该有一个良好的幂等设计

正确示例:某系统中消费的逻辑是为某个用户发送短信,该短信已经发送成功了,当消费者应用重复收到该消息,此时应该返回消费成功。

总结

本文主要介绍重试的基本概念,生产者消费者收发消息时触发重试的条件和具体行为,以及 RocketMQ 收发容错的最佳实践。

重试策略帮助我们从随机的、短暂的瞬态故障中恢复,是在容忍错误时,提高可用性的一种强大机制。但请谨记 “重试是对于分布式系统来说自私的”,因为客户端认为其请求很重要,并要求服务端花费更多资源来处理,盲目的重试设计不可取,合理的使用重试可以帮助我们构建更加弹性且可靠的系统。

欢迎扫描下方二维码加入钉钉群一起沟通交流~

在这里插入图片描述

点击此处,进入官网了解更多详情~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/12438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]java毕业设计静谧空间自习室预订系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

服务器部署Vue2脚手架的PIXI游戏项目-知识点注意

文章目录安装PIXI框架方式一&#xff08;安装&#xff09;方式二&#xff08;引入&#xff09;javaScript代码位置initPixi方法组件挂载完毕后调用注意文件导入使用import一个个导入并命名使用setTimeout()方法使用一般方法调用表达式使用安装PIXI框架 方式一&#xff08;安装…

高通量筛选检测方法-分子篇

分子水平的筛选更多的是检测酶/受体功能的改变或探针/蛋白质结合的抑制&#xff0c;或是检测蛋白质-配体结合的结构、动力学和亲和度。 下面将介绍了荧光偏振、荧光共振能量转移、酶联免疫吸附、表面等离子共振和核磁共振技术几种方法。 ■ 荧光偏振 荧光偏振是一项在高通量筛…

2-STM32GPIO输入之按键

文章目录1-硬件设计1.1 按键消斗1.1.1 RS触发器1.1.2 电容滤波2 按键电路设计2.1 软件消斗2.2 硬件消斗2.3 检测原理2-软件设计2.1 软件消斗2.1.1原理2.1.2 编程要点2.1.3 步骤2.2 代码编写2.2.1 主程序2.2.2 按键初始化2.2.2 按键扫描本章讲述GPIO输入的应用&#xff0c;使用独…

第03章_用户与权限管理

第03章_用户与权限管理1 用户管理1.1 登录MySQL服务器1.2 创建用户1.3 修改用户1.4 删除用户1.5 设置当前用户密码1.6 修改其它用户密码1.7 MySQL8密码管理(了解)2. 权限管理2.1 权限列表2.2 授予权限的原则2.3 授予权限2.4 查看权限2.5 收回权限3. 权限表3.1 user表3.2 db表3.…

【springboot】18、内置 Tomcat 配置和切换

文章目录基本介绍Tomcat配置切换其他Web服务总结基本介绍 SpringBoot 支持的 webServer有: Tomcat, Jetty, or Undertow&#xff0c;我们使用spring-boot-starter-web进行web开发时&#xff0c;默认使用的就是Tomcat&#xff0c;下面来说明一下tomcat的配置以及切换其他的Web服…

小啊呜产品读书笔记001:《邱岳的产品手记-05》第9讲 产品案例分析:Hopper的“人工智能” 第10讲 产品被抄袭了怎么办?

小啊呜产品读书笔记001&#xff1a;《邱岳的产品手记-05》第9讲 产品案例分析&#xff1a;Hopper的“人工智能” & 第10讲 产品被抄袭了怎么办&#xff1f;一、今日阅读计划二、泛读&知识摘录1、09 讲 产品案例分析&#xff1a;Hopper的“人工智能”2、10 讲 产品被抄袭…

《机器学习实战》8.预测数值型数据:回归

目录 预测数值型数据&#xff1a;回归 1 利用线性回归找到最佳拟合直线 2 局部加权线性回归 3 示例&#xff1a;预测鲍鱼的年龄 4 缩减系数来“理解”数据 4.1 岭回归 4.2 lasso 4.3 前向逐步回归 5 权衡偏差与方差 6 示例&#xff1a;预测乐高玩具套装的价格 6.1 收…

数字化转型指南发布,官方明确这样做!

上周&#xff0c;工信部《中小企业数字化转型指南》&#xff08;以下简称《指南》&#xff09;一经发布&#xff0c;便获得了大量官方媒体的转发&#xff0c;成为了几乎所有制造人的关注所在。制造企业数字化转型的标准路径首次被标准化&#xff0c;并传递给了更多的中国制造企…

python可以考的资格认证有哪些?

前言 可以考虑用Python做一个博客&#xff0c;或者仿制一个微博&#xff0c;或者仿制一个视频网站&#xff0c;或者仿制一个购物网站。界面简单一些&#xff0c;但是基础功能好用就行。&#xff08;文末送读者福利&#xff09; 2.或者学习用Python在网上爬一些数据&#xff0…

就地执行Windows Server2022升级

项目初期背景:“微软Windows Server 2012/2012 R2将于2023年10月停止支持 微软今天发出提醒,Windows Server 2012 和 Windows Server 2012 R2 将于 2023 年 10 月终止支持,届时将不再发布补丁更新。由于外企公司比较注重信息安全,对所有服务器需要确保有补丁修复更新,以便保…

一些逻辑漏洞案例

逻辑漏洞的一些案例 某edu高校逻辑漏洞弱口令 已提交该校&#xff0c;已修复 注册登陆 寻找上传点&#xff0c;无果&#xff0c;后缀名不可控 找到另一个登陆点&#xff0c;尝试使用之前注册的账户登陆、爆破、均无果 在测试找回密码处&#xff0c;发送admin用户发现返回管理…

云计算实验4 面向行业背景的大数据分析与处理综合实验

一、 实验目的 掌握分布式数据库接口Spark SQL基本操作&#xff0c;以及训练综合能力&#xff0c;包括&#xff1a;数据预处理、向量处理、大数据算法、预测和可视化等综合工程能力 二、 实验环境 Linux的虚拟机环境和实验指导手册 三、 实验任务 完成Spark SQL编程实验、…

[附源码]java毕业设计基于篮球云网站

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

电压放大器原理(电压放大器适用于什么场合使用)

前阵子有不少的工程师在后台咨询&#xff0c;电压放大器适用于什么场合使用、电压放大器原理等等的内容&#xff0c;虽然电压放大器使用的人群很多&#xff0c;但是还是有不少新手工程师对于电压放大器一知半解。今天安泰电子就来为大家介绍电压放大器的原理以及应用场合。 电压…

redis之变慢了该如何排查?

写在前面 不管什么工具&#xff0c;会使用永远只是第一步&#xff0c;第二步是当其出现某些问题时&#xff0c;拥有排查和修复问题的能力&#xff0c;而我们在使用Redis的过程中&#xff0c;变慢就是其中一个比较棘手的问题&#xff0c;因此本文就一起来看下&#xff0c;当遇到…

STM32实现0.96寸OLED显示模拟IIC和IIC四种实现(标准库和HAL库)

目录 本文通过四种方法实现OLED显示 设备选择 OLED介绍 接线表设计 OLED应用 1.标准库模拟IIC实现OLED显示 2.标准库IIC实现OLED显示 3.HAL库模拟IIC实现OLED显示 4.HAL库IIC实现OLED显示 实现效果 代码下载 本文通过四种方法实现OLED显示 设备选择 1.单片机&#…

数字电路中的基础电路结构

基本单元&#xff1a; 1.1 与非门 1.2 或非门 2输入与非门需要4个晶体管&#xff08;n输入与非门需要2xn个晶体管&#xff09;&#xff0c;非门需要两个晶体管&#xff0c;2输入或非门需要6个晶体管&#xff08;n输入或非门需要 2xn 2个晶体管&#xff09;。 静态存储器 1bi…

用 Java 的 IO 流进行读写文件操作

前言 在计算机领域里 IO&#xff0c;有时也写作 I/O&#xff0c;是Input / Output的缩写&#xff0c;也就是输入和输出。这里的输入和输出是指不同系统之间的数据输入和输出&#xff0c;比如读写文件数据&#xff0c;读写网络数据等等。 本文内容大纲如下: Java 有哪些IO框…

数据结构【AVL树模拟实现】

目录 AVL树概念 AVL树结构 insert AVL树的旋转 新节点插入较高右子树的右侧---右右&#xff1a;左单旋 新节点插入较高左子树的左侧---左左&#xff1a;右单旋 新节点插入较高左子树的右侧---左右&#xff1a;先左单旋再右单旋 新节点插入较高右子树的左侧---右左&…