[RabbitMQ] 重试机制+TTL+死信队列

news2024/11/28 23:44:26

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 重试机制
    • 1.1 重试配置
    • 1.2 配置交换机队列
    • 1.3 发送消息
    • 1.4 消费消息
    • 1.5 运行测试
    • 1.6 手动确认
  • 2. TTL
    • 2.1 设置消息TTL
    • 2.2 设置消息队列的TTL
    • 2.3 两者的区别
  • 3. 死信队列
    • 3.1 死信队列的概念
    • 3.2 代码示例
      • 3.2.1 声明队列和交换机
      • 3.2.2 正常队列与死信交换机的绑定
      • 3.2.3 制造死信产生的条件
      • 3.2.4 发送消息
      • 3.2.5 测试死信
    • 3.3 常见面试题

1. 重试机制

在消息传递的过程中,可能会遇到各种问题,比如网络故障,服务不可用,资源不足等,这些问题都可能会导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败之后重新发送.
我们也可以对重试机制设置重试次数.超过了重试的限制次数,如果没有对队列进行死信队列的绑定,那么该消息就会发生丢失.

1.1 重试配置

  rabbitmq:
    host: 182.92.204.253
    port: 5672
    username: jiangruijia
    password: *****
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          initial-interval: 5000ms
          enabled: true
          max-attempts: 5
  • initial-interval: 表示初始失败的等待时间为5s
  • enabled: 表示开启消费者失败重试
  • max-attempts: 最大重试次数,包括首次发送
  • 注意: 重试机制在自动确认模式之下才会生效,如果配置为手动确认,则重试机制不会生效.

1.2 配置交换机队列

public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE = "retry_exchange";
@Bean
public DirectExchange retryExchange(){
    return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).durable(true).build();
}
@Bean
public Queue retryQueue(){
    return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
@Bean
public Binding retryBinding(@Qualifier("retryExchange") DirectExchange exchange,@Qualifier("retryQueue") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("retry");
}

1.3 发送消息

@RequestMapping("/retry")
public String retry(){
    rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","retry消息");
    return "发送成功";
}

1.4 消费消息

@Component
public class RetryListener {
    @RabbitListener(queues = Constant.RETRY_QUEUE)
    public void retryListener(Message message) throws UnsupportedEncodingException {
        System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8")
        ,message.getMessageProperties().getDeliveryTag());
        int i = 3/0;
        System.out.println("处理完成");
    }
}

1.5 运行测试

使用Postman调用接口,观察控制台信息
在这里插入图片描述
我们看到由于消费者发生异常,消息在不停地进行重试,重试5次之后,抛出了异常.
如果我们对异常进行捕获,则不会进行重试.

@Component
public class RetryListener {
    @RabbitListener(queues = Constant.RETRY_QUEUE)
    public void retryListener(Message message) throws UnsupportedEncodingException {
        System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8")
        ,message.getMessageProperties().getDeliveryTag());
        try {
            int i = 3/0;
        }catch (Exception e){
            System.out.println("处理失败");
        }
        System.out.println("处理完成");
    }
}

在这里插入图片描述

1.6 手动确认

如果我们把确认接收机制改为手动确认机制

@Component
public class RetryListener {
    @RabbitListener(queues = Constant.RETRY_QUEUE)
    public void retryListener(Message message, Channel channel) throws IOException {
        System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8")
        ,message.getMessageProperties().getDeliveryTag());
        try {
            Thread.sleep(1000);
            int i = 3/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            System.out.println("处理失败");
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
        }
        System.out.println("处理完成");
    }
}

运行结果:
在这里插入图片描述
我们看到,在手动控制模式的时候,重试的次数不会像自动控制那样直接生效,因为是否重试以及何时重试更多地取决于应用程序和消费者的实现逻辑.

2. TTL

TTL,即过期时间,RabbitMQ可以对消息和队列设置TTL.
当消息到达存活时间之后,还没有被消费,就会被自动清除.

2.1 设置消息TTL

目前有两种方法可以设置消息的TTL.一是设置队列的TTL,队列中所有消息都有相同的过期时间,二是设置每条消息的过期时间,每条消息都可以有不同的TTL.如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准.
首先我们针对每条消息设置TTL.方法就是我们在发送消息的时候对消息的expiretion参数进行设置,单位为毫秒.
我们先来配置交换机和队列.

public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE = "ttl_exchange";
@Bean
public DirectExchange ttlExchange(){
    return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).durable(true).build();
}
@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("ttl");
}

发送消息

@RequestMapping("/ttl")
public String ttl(){
    String msg = "ttl消息";
    Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));
    message.getMessageProperties().setExpiration("10000");//设置消息过期时间为10s
    rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl",message);
    return "发送信息";
}

运行程序,观察结果:

  1. 发送消息之后,Ready消息为1
    在这里插入图片描述
  2. 过了10s之后,发现消息被删除
    在这里插入图片描述

2.2 设置消息队列的TTL

设置队列TTL的方法是在创建队列时,加入x-message-ttl参数实现,单位为毫秒.
设置队列和绑定关系

@Bean
public Queue ttlQueue2(){
    Map<String,Object> map = new HashMap<>();
    map.put("x-message-ttl",10000);//设置10s过期
    return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(map).build();
}
@Bean
public Binding ttl2Binding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue2") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("ttl2");
}

队列的声明也可以简写为:

@Bean
public Queue ttlQueue2(){
    return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(10000).build();
}

发送消息:

@RequestMapping("/ttl2")
public String ttl2(){
    rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","ttl2消息");
    return "发送成功";
}

运行程序之后观察结果:
运行之后发现,新增了一个队列,队列有一个TTL标识.
在这里插入图片描述
使用Postman调用接口发送消息:
在这里插入图片描述
10s之后发现队列中的消息被删除:
在这里插入图片描述

2.3 两者的区别

设置队列的ttl属性方法,一旦消息过期,就会立即从队列中删除.如果消息设置ttl,即使消息过期,也不会立即从队列中删除,而是在即将投递到消费者之前进行判定,如果过期了,才进行删除.
这两种方法处理过期消息的原理如下:
设置队列的过期时间,队列中已经过期的消息肯定在队列头部,RabbitMQ只需要定期扫描队头是否有过期的消息即可.
而设置消息的TTL的方式,每条消息的过期时间不同,如果定期扫描整个队列,效率是非常低的,所以不如等到此消息即将被消费时再判定是否过期.

3. 死信队列

3.1 死信队列的概念

死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
有死信,就会有死信队列,当一个消息在一个队列中变为死信之后,它能被重新被发送到另一个交换机中,这个交换机就是DLX(Dead Letter Exchange),绑定DLX的队列,就被称为死信队列(DLQ,Dead Letter Queue).
在这里插入图片描述
消息变为死信一般是由于一下的几种情况:

  1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
  2. 消息过期
  3. 队列达到最大长度

3.2 代码示例

3.2.1 声明队列和交换机

交换机和队列的声明包含两部分:

  • 正常声明的交换机和队列
  • 声明死信队列和死信的交换机
    死信的交换机和队列和普通的交换机队列在声明的时候没有什么区别.
public static final String NORMAL_QUEUE = "normal_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
@Bean
public DirectExchange normalExchange(){
    return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange,@Qualifier("normalQueue") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public DirectExchange dlxExchange(){
    return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Queue dlxQueue(){
    return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange exchange,@Qualifier("dlxQueue") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("dlx");
}

3.2.2 正常队列与死信交换机的绑定

当一个队列中存在死信的时候,RabbitMQ会自动把这个消息重新发布到设置的DLX交换机上,进而被路由到另一个队列,即死信队列.
绑定的时候需要为普通队列设置x-dead-letter-exchangex-dead-letter-routing-key两个参数,第一个参数是绑定的死信交换机,第二个参数是死信队列的路由键.

@Bean
public Queue normalQueue(){
    Map<String,Object> map = new HashMap<>();
    map.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE);
    map.put("x-dead-letter-routing-key","dlx");
    return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(map).build();
}

上面的map可以简写为:

@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(Constant.NORMAL_QUEUE).
            deadLetterExchange(Constant.DLX_EXCHANGE).
            deadLetterRoutingKey("dlx").
            build();
}

3.2.3 制造死信产生的条件

我们通过设置队列长度限制或者是设置消息的过期时间来制造死信.如果队列中的消息过期,或者是队列中的消息超过队列的长度,全部会被路由到死信队列.

@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(Constant.NORMAL_QUEUE).
            deadLetterExchange(Constant.DLX_EXCHANGE).
            deadLetterRoutingKey("dlx").
            ttl(10000).
            maxLength(10L).
            build();
}

3.2.4 发送消息

@RequestMapping("/normal")
public String normal(){
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal消息");
    return "发送成功";
}

3.2.5 测试死信

  1. 启动程序之后观察队列
    在这里插入图片描述
    我们发现队列normal_queue有5个标签,则会五个标签分别代表该队列的5个属性:
    • D: durable的缩写,设置持久化.
    • TTL: Time to Live,队列的TTL.
    • Lim: 队列设置了长度(x-max-length).
    • DLX: 死信队列的交换机
    • DLK: 死信队列的路由键
  2. 测试到达过期时间
    接下来我们给正常队列中发送一条消息.
    在这里插入图片描述
    10秒钟之后,队列过期,消息也会过期,所以消息就会进入死信队列.
    在这里插入图片描述
    在这里插入图片描述
  3. 测试超出队列长度
    我们给队列中一次性发送20条消息
    在这里插入图片描述
    我们发现其中的10条消息会直接进入死信队列.
  4. 测试消息被拒收
    编写消费者代码,并抛出异常,普通队列消费者确认接收机制使用手动应答.
@Component
public class NormalListener {
    @RabbitListener(queues = Constant.NORMAL_QUEUE)
    public void normalListener(Message message, Channel channel) throws IOException, InterruptedException {
        System.out.printf("接收到消息%s,tag:%d\n",new String(message.getBody(),"UTF-8"),
                message.getMessageProperties().getDeliveryTag());
        try {
            System.out.println("处理消息");
            int i = 3/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }catch (Exception e){
            System.out.println("消息处理失败");
            Thread.sleep(1000);
            //拒绝接收之后不放回原队列中,则会放入死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
        }
    }
}
@Component
public class DLXListener {
    @RabbitListener(queues = Constant.DLX_QUEUE)
    public void dlxListener(Message message) throws UnsupportedEncodingException {
        System.out.printf("死信队列接收到消息:%s,tag:%d\n",new String(message.getBody(),"UTF-8")
        ,message.getMessageProperties().getDeliveryTag());
    }
}

在这里插入图片描述

3.3 常见面试题

  1. 死信队列的概念
    死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
  2. 来源
    1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
    2. 消息过期
    3. 队列达到最大长度
  3. 死信队列的应用场景
    它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统.
    一般的应用场景还有:
    1. 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
    2. 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占用系统资源.
    3. 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2249378.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端入门之VUE--基础与核心

前言 VUE是前端用的最多的框架&#xff1b;这篇文章是本人大一上学习前端的笔记&#xff1b;欢迎点赞 收藏 关注&#xff0c;本人将会持续更新。 Vue学习笔记 用于构建用户界面的渐进式框架 构建用户界面&#xff1a;基于数据动态渲染页面渐进式&#xff1a;循序渐近的学…

java基础知识(常用类)

目录 一、包装类(Wrapper) (1)包装类与基本数据的转换 (2)包装类与String类型的转换 (3)Integer类和Character类常用的方法 二、String类 (1)String类介绍 1)String 对象用于保存字符串,也就是一组字符序列 2)字符串常量对象是用双引号括起的字符序列。例如:&quo…

嵌入式驱动开发详解2(设备挂载问题)

文章目录 前言设备号设备号的组成设备号的分配静态分配动态分配 驱动挂载与卸载设备节点创建驱动挂载出现问题 前言 驱动的设备挂载和卸载是十分重要的内容&#xff0c;一旦操作不当可能会导致系统崩溃&#xff0c;接下来我将用字符设备的驱动挂载原理进行详细讲解&#xff0c…

谈谈微服务的常用组件

由于微服务给系统开发带来了一些问题和挑战&#xff0c;如服务调用的复杂性、分布式事务的处理、服务的动态管理等&#xff0c;为了更好地解决这些问题和挑战&#xff0c;各种微服务治理的组件应运而生&#xff0c;充当微服务架构的基石和支撑&#xff0c;常用组件如下表&#…

【数字图像处理+MATLAB】通过迭代全局阈值处理算法(Iterative Global Algorithm)实现图像分割

引言 图像分割是将数字图像划分为多个区域&#xff08;或像素的集合&#xff09;的过程&#xff0c;这些区域通常对应于真实世界的物体或图像中的特定部分。图像分割的目标是简化或改变图像的表示形式&#xff0c;使得图像更容易理解和分析。图像分割通常用于定位图像中的物体…

【三维生成】Edify 3D:可扩展的高质量的3D资产生成(英伟达)

标题&#xff1a;Edify 3D: Scalable High-Quality 3D Asset Generation 项目&#xff1a;https://research.nvidia.com/labs/dir/edify-3d demo&#xff1a;https://build.nvidia.com/Shutterstock/edify-3d 文章目录 摘要一、前言二、多视图扩散模型2.1.消融研究 三、重建模型…

在SQLyog中导入和导出数据库

导入 假如我要导入一个xxx.sql&#xff0c;我就先创建一个叫做xxx的数据库。 然后右键点击导入、执行SQL脚本 选择要导入的数据库文件的位置&#xff0c;点击执行即可 注意&#xff1a; 导入之后记得刷新一下导出 选择你要导出的数据库 右键选择&#xff1a;备份/导出、…

HDR视频技术之三:色度学与颜色空间

HDR 技术的第二个理论基础是色度学。从前面的内容中可以了解到&#xff0c;光学以及人类视觉感知模型为人类提供了解释与分析人类感知亮度的理论基础&#xff0c;但是 HDR 技术不仅仅关注于提升图像与视频的亮度范围&#xff0c;同时也关注于提供更加丰富的色彩。因此&#xff…

通信与网络安全之IPSEC

IPSec&#xff08;IP Security&#xff09;是IETF制定的为保证在Internet上传送数据的安全保密性能的三层隧道加密协议。IPSec在网络层对IP报文提供安全服务。IPSec协议本身定义了如何在IP数据包中增加字段来保证IP包的完整性、 私有性和真实性&#xff0c;以及如何加密数据包。…

Redis的管道操作

在现代应用程序中&#xff0c;Redis作为一种高性能的内存数据库&#xff0c;被广泛用于缓存、消息队列、实时分析等场景。为了进一步提高Redis的性能&#xff0c;Redis提供了管道&#xff08;Pipeline&#xff09;操作&#xff0c;允许客户端将多个命令一次性发送到服务器&…

67 mysql 的 间隙锁

前言 我们这里主要是 来看一下 mysql 中的 间隙锁 间隙锁 主要存在的地方一般就是在 查询主键查询不到, 索引查询查询不到 的场景 然后 我们这里来调试一下 这里的整个流程, 间隙锁的加锁 以及 间隙锁的使用, 以及 间隙锁的释放 从逻辑上来说 间隙锁 锁定的是一个区间, 按照…

小米PC电脑手机互联互通,小米妙享,小米电脑管家,老款小米笔记本怎么使用,其他品牌笔记本怎么使用,一分钟教会你

说在前面 之前我们体验过妙享中心&#xff0c;里面就有互联互通的全部能力&#xff0c;现在有了小米电脑管家&#xff0c;老款的笔记本竟然用不了&#xff0c;也可以理解&#xff0c;毕竟老款笔记本做系统研发的时候没有预留适配的文件补丁&#xff0c;至于其他品牌的winPC小米…

Apache Zeppelin:一个基于Web的大数据可视化分析平台

今天给大家推荐一下 Apache Zeppelin&#xff0c;它是一个基于 Web 的交互式数据接入、数据分析、数据可视化以及协作文档 Notebook&#xff0c;类似于 Jupyter Notebook。 Apache Zeppelin 支持使用 SQL、Java、Scala、Python、R 等编程语言进行数据处理和分析&#xff0c;同时…

彻底理解如何保证ElasticSearch和数据库数据一致性问题

一.业务场景举例 需求&#xff1a; 一个卖房业务&#xff0c;双十一前一天&#xff0c;维护楼盘的运营人员突然接到合作开发商的通知&#xff0c;需要上线一批热门的楼盘列表&#xff0c;上传完成后&#xff0c;C端小程序支持按楼盘的名称、户型、面积等产品属性全模糊搜索热门…

EasyExcel: 结合springboot实现表格导出入(单/多sheet), 全字段校验,批次等操作(全)

全文目录,一步到位 1.前言简介1.1 链接传送门1.1.1 easyExcel传送门 2. Excel表格导入过程2.1 easyExcel的使用准备工作2.1.1 导入maven依赖2.1.2 建立一个util包2.1.3 ExcelUtils统一功能封装(单/多sheet导入)2.1.4 ExcelDataListener数据监听器2.1.5 ResponseHelper响应值处理…

前端实用知识-用express搭建本地服务器

目录 一、为什么会有这篇文章&#xff1f; 二、使用前的准备-如环境、工具 三、如何使用&#xff1f;-express常用知识点 四、代码演示-配合截图&#xff0c;简单易懂 一、为什么会有这篇文章&#xff1f; 在日常前端开发中&#xff0c;我们离不开数据&#xff0c;可能是用…

Redis(概念、IO模型、多路选择算法、安装和启停)

一、概念 关系型数据库是典型的行存储数据库&#xff0c;存在的问题是&#xff0c;按行存储的数据在物理层面占用的是连续存储空间&#xff0c;不适合海量数据存储。 Redis在生产中使用的最多的是用作数据缓存。 服务器先在缓存中查询数据&#xff0c;查到则返回&#xff0c;…

C#基础控制台程序

11.有一个54的矩阵&#xff0c;要求编程序求出其中值最大的那个元素的值&#xff0c;以及其所在的行号和列号。 12.从键盘输入一行字符&#xff0c;统计其中有多少个单词&#xff0c;单词之间用空格分隔开。 13.输入一个数&#xff0c;判断它是奇数还是偶数&#xff0c;如果…

Flink开发入门简单案例--统计实时流订单

Flink开发入门简单案例 0.简介1.订单数据生成器1.1 新建工程TestFlink1.2 在pom.xml中引入Flink依赖包1.3 订单数据生成类订单类&#xff08;Item&#xff09;订单生成数据流类测试订单生成类 2.订单统计2.1 仅统计订单中商品的件数 2.2 同时统计商品数量和金额 0.简介 本案例…

AI前景分析展望——GPTo1 SoraAI

引言 人工智能&#xff08;AI&#xff09;领域的飞速发展已不仅仅局限于学术研究&#xff0c;它已渗透到各个行业&#xff0c;影响着从生产制造到创意产业的方方面面。在这场技术革新的浪潮中&#xff0c;一些领先的AI模型&#xff0c;像Sora和OpenAI的O1&#xff0c;凭借其强大…