Rabbit高级特性 - 消息重试机制(两种实现)

news2024/9/22 9:56:05

文章目录

  • 消息重试机制
    • 概述
    • 实现方式一:基于消息手动确认机制,返回 nack 实现
      • 配置文件
      • 交换机、队列、绑定
      • 生产者接口
      • 消费者
      • 演示和结论
    • 实现方式二:基于重试配置实现
      • 配置文件
      • 交换机、队列、绑定
      • 生产者接口
      • 消费者
      • 演示和结论

消息重试机制


概述

消息重试机制就是在消息处理失败之后重新发送,主要时为了解决消息发送过程可能会出现的问题,例如 网络故障、服务临时不可用 等.

Ps:如果时程序逻辑引起的错误,那么即使重试多少次都是没有用的,但是可以通过配置重试次数来解决.

实现方式一:基于消息手动确认机制,返回 nack 实现

配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认

交换机、队列、绑定

    @Bean("ackExchange")
    fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)
    @Bean("ackQueue")
    fun ackQueue() = Queue(MQConst.ACK_QUEUE)
    @Bean
    fun ackBinding(
        @Qualifier("ackExchange") exchange: DirectExchange,
        @Qualifier("ackQueue") queue: Queue,
    ): Binding {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.ACK_BINDING)
    }

生产者接口

@RestController
@RequestMapping("/mq3")
class MQ3Api(
   val rabbitTemplate: RabbitTemplate
) {

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

}

消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        val deliveryTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
            val a = 1 / 0
            channel.basicAck(deliveryTag, false)
        } catch (e: Exception) {
            //通过返回 nack,并设置 requeue 为 ture 实现消息重新入队,并进行重试
            channel.basicNack(deliveryTag, false, true) 
        }
    }

}

演示和结论

在这里插入图片描述
deliverTag 自增的原因: 引发异常后,会返回 nack,并且参数 requeue = true,表示重新入队,然后进行重试,将队列中的消息再次发送给生产者,因此 deliverTag 会自增.

缺点: 如果是由于程序逻辑异常引起的重试,那么无论重试多少次都没用,并且不断重试会导致负载飙升,性能下降.

实现方式二:基于重试配置实现

配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: auto # 开启重试机制,这里必须是 auto,否则不生效!
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 失败等待时常
          max-attempts: 5 # 最大重试次数(包括第一次消费)

Ps:开启重试机制,acknowledge-mode 必须指定为 auto,否则不生效!

交换机、队列、绑定

    @Bean("ackExchange")
    fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)
    @Bean("ackQueue")
    fun ackQueue() = Queue(MQConst.ACK_QUEUE)
    @Bean
    fun ackBinding(
        @Qualifier("ackExchange") exchange: DirectExchange,
        @Qualifier("ackQueue") queue: Queue,
    ): Binding {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.ACK_BINDING)
    }

生产者接口

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
        val a = 1 / 0
    }
    
}

演示和结论

在这里插入图片描述
deliverTag 不自增的原因: 因为是消息已经发出去了,即使失败了也不会重回队列,而是直接重新发一遍消息.

好处: 不仅可以控制重试次数(防止类似于上面讲到的确认应答引起的无限重试),还可以控制每次重试的间隔时间(防止负载飙升).

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1990282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对深度学习神经网络做了小改进,效果提升明显,可以发论文吗?

小改了深度学习神经网络某一层,效果提升显著,可以发顶会吗? 结论就是,当然可以发,只是能不能发出高质量论文,中稿顶会,还得看你这个小改进的具体情况还有你讲故事的能力。 先说改进的具体情况…

国家统计局中国主要城市面板数据(1990-2023年)

数据说明:数据来源于国家统计局,指标包含:城市、年份、第三产业增加值、第一产业增加值 地区生产总值、第二产业增加值、年末户籍人口、城镇非私营单位在岗职工平均工资 房地产开发投资额、房地产开发住宅投资额、房地产开发办公楼投资额、房…

什么是股指期权与股指期货的套利策略?

沪深300股指期权和沪深300股指期货就像是孪生兄弟,它们不仅到期时间一样,结算价格也一样,而且都是现金结算。这意味着,如果你用期权的方式“造”出一个期货来,跟市场上真实交易的期货,在到期那天&#xff0…

Eclipse 首选项(Preferences)

设置首选项 该对话框可通过框架管理但是其他插件可以设置其他页面来管理首选项的配置。 我们可以通过 Window 菜单选择 Preferences 菜单项来开启该对话框。 首选项页面有多个分类组成。你可以在左侧菜单中展开各个节点来查看首选项的配置。 左上角的输入框可以快速查找首选…

#子传父父传子props和emits #封装的table #vue3

#子传父&父传子props和emits #封装的table #vue3 父组件&#xff1a;emits defineEmits props 子组件&#xff1a; 子组件 <template><el-table v-bind"$attrs" ref"innerTableRef" v-loading"loading" border :data"tabl…

鸿蒙HarmonyOS开发:如何灵活运用动画效果提升用户体验

文章目录 一、动画概述1、动画的目的 二、显式动画 (animateTo)1、接口2、参数3、AnimateParam对象说明4、示例5、效果 三、属性动画 (animation)1、接口2、参数3、AnimateParam对象说明4、系统可动画属性4、示例5、效果 一、动画概述 动画的原理是在一个时间段内&#xff0c;…

QT使用ui文件创建窗口

目录 带ui文件 添加状态栏 资源文件的使用 添加文件资源 使用文件资源 带ui文件 编辑 添加状态栏 MainWindow::MainWindow(QWidget *parent): QMainWindow(parent), ui(new Ui::MainWindow) {ui->setupUi(this);// 添加状态栏ui->statusbar->addWidget(new QLab…

codeforces查看题解

文章目录 1. 步骤一&#xff1a;右键单击箭头指向位置&#xff0c;根据提示打开链接2. 步骤二&#xff1a;左键单击箭头指向位置3. 结果&#xff1a;红线上方是参考代码&#xff0c;下方是测试样例4. 补充&#xff1a;① 右边这个方框可以筛选想要查看代码的状态(Accepted、Wro…

AI在医学领域:残差扩散模型预测特发性肺纤维化 (IPF)

关键词&#xff1a; IPF 进展预测、残差扩散模型、临床信息 特发性肺纤维化&#xff08;Idiopathic Pulmonary Fibrosis&#xff0c;IPF&#xff09;是一种严重且不可逆的肺部疾病&#xff0c;它会导致肺部组织出现瘢痕和增厚&#xff0c;从而引起呼吸困难。。及时对IPF进行治…

Unity入门4——常用接口

C#中常用类和接口 DateTime&#xff1a;表示某个时刻 DateTime.Now&#xff1a;拿到系统当前时间DtaTime.TimeOfDay&#xff1a;获取此实例当天的时间 Quaternion&#xff1a;用来旋转&#xff0c;采用四元数&#xff0c;由w&#xff08;实部&#xff09;和x,y,z&#xff08;虚…

ChinaJoy 2024: 维塔士携自研游戏亮相,探讨数据驱动游戏开发

2024年7月30日,上海——全球领先的视频游戏开发公司维塔士精彩亮相第二十一届中国国际数码互动娱乐展览会(ChinaJoy),并首次公开自研游戏《唐传奇:琵琶行》DEMO试玩。在展会首日举办的2024中国游戏开发者大会(CGDC)上,来自维塔士西安工作室的执行制作人熊鹏昱受邀发表题为《维塔…

springboot的拦截器,监听器,过滤器,servlet的使用(三大组件)

目录 1. 拦截器1.1 简介1.2 使用 2. 监听器2.1 简介2.2 使用 3. 过滤器3.1 简介3.2 使用 4. servlet4.1 简介4.2 使用 5. 例子6. 心得 1. 拦截器 1.1 简介 Spring Boot 拦截器&#xff08;Interceptor&#xff09;是Spring MVC中用于在请求处理流程中执行某些操作的组件。它们…

Open3D 三维重建-Alpha Shapes (α-形状)

目录 一、概述 1.1原理 1.2实现步骤 二、代码实现 2.1关键函数 2.1.1函数 2.1.2参数详解 2.2完整代码 三、实现效果 3.1原始点云 3.2处理后点云 Open3D点云算法汇总及实战案例汇总的目录地址&#xff1a; Open3D点云算法与点云深度学习案例汇总&#xff08;长期更新…

08-PCB工程文件的创建

1.创建工程文件 2.创建原理图库 3.创建PCB元件库 4.创建原理图 5.创建PCB 6.改名&#xff0c;保持和项目名一致 最后save。

uniapp获取swiper中子组件的内容高度

swiper有默认高度,如果不单独设置一个具体高度&#xff0c;swiper后面的内容将不会展示 这里展示的例子是: swiper中放有一个子组件,想要完整展示子组件的内容&#xff0c;swiper就需要获取到子组件的内容高度并设置 <!-- 注意: 这里的单位是 px,不是rpx --><swiper…

【STM32】IO口取反 | 寄存器方式 | 异或运算符 | 原理

目录 STM32 IO口取反 | 寄存器方式 | 异或运算符 | 原理1. 引言2. GPIO基础知识2.1 GPIO概述2.2 STM32的GPIO架构2.3 GPIO寄存器简介 3. GPIO引脚取反原理3.1 寄存器操作实现取反3.2 异或运算符的应用 4. 示例代码4.1 基础示例&#xff1a;LED闪烁4.2 应用实例&#xff1a;继电…

大数据面试SQL(三):每分钟在线直播人数

文章目录 每分钟在线直播人数 一、题目 二、分析 三、SQL实战 四、样例数据参考 每分钟在线直播人数 一、题目 有如下数据记录直播平台主播上播及下播时间&#xff0c;根据该数据计算出平台每分钟的在线直播人数。 这里用主播名称做统计&#xff0c;前提是主播名称唯一…

【初阶数据结构题目】16.用队列实现栈

用队列实现栈 点击链接答题 思路&#xff1a; 出栈&#xff1a;找不为空的队列&#xff0c;将size-1个数据导入到另一个队列中。 入栈&#xff1a;往不为空队列里面插入数据 取栈顶元素&#xff1a; 例如&#xff1a; 两个队列&#xff1a; Q1&#xff1a;1 2 3Q2&#xff1a;…

『大模型笔记』从API到Agent:万字长文洞悉LangChain工程化设计

『大模型笔记』从API到Agent&#xff1a;万字长文洞悉LangChain工程化设计 具体内容来自&#xff1a;从API到Agent&#xff1a;万字长文洞悉LangChain工程化设计

Spring源码解析(30)之AOP拦截链执行过程

一、前言 在上一节中我们介绍了AOP动态代理对象的创建过程&#xff0c;并且看到了Spring AOP在生成calllBacks的时候第一个拦截器就是&#xff1a;DynamicAdvisorInterceptor&#xff0c;所以我们通过代理对象执行对应的方法的时候就如跳入到这个拦截器中&#xff0c;接下来我们…