基于Springboot跟rabbitmq实现的死信队列

news2024/11/26 1:35:47

概述

RabbitMQ是流行的开源消息队列系统,使用erlang语言开发。为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但由于对死信队列的概念及配置不熟悉,导致曾一度陷入百度的汪洋大海,无法自拔,很多文章都看起来可行,但是实际上却并不能帮我解决实际问题。最终,在官网文档中找到了我想要的答案,通过官网文档的学习,才发现对于死信队列存在一些误解,导致配置死信队列之路困难重重。

详细

一、运行效果

image.png

image.png

二、实现过程

①、先创建一个Springboot项目。然后在pom文件中添加 spring-boot-starter-amqp 和 spring-boot-starter-web的依赖,接下来创建一个Config类,这里是关键:

package com.zyf.rabbitmqdeadletterdemo.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class RabbitMQConfig {
 
    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange";
 
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA";
    public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB";
 
    public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange";
 
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey";
 
    public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA";
    public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB";
 
    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }
 
    // 声明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
 
    // 声明业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
 
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }
 
    // 声明业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }
 
    // 声明死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }
 
    // 声明死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }
 
    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
 
    // 声明业务队列B绑定关系
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
 
    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }
 
    // 声明死信队列B绑定关系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

②、接下来,是业务队列的消费代码:

@Slf4j@Componentpublic class BusinessMessageReceiver {    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息A:{}", msg);        boolean ack = true;
        Exception exception = null;        try {            if (msg.contains("deadletter")){                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }        if (!ack){
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到业务消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

③、然后配置死信队列的消费者:

@Componentpublic class DeadLetterMessageReceiver {    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

④、为了方便测试,写一个简单的消息生产者,并通过controller层来生产消息。

@Componentpublic class BusinessMessageSender {    @Autowired
    private RabbitTemplate rabbitTemplate;    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController {    @Autowired
    private BusinessMessageSender sender;    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

三、项目结构图

image.png

四、补充总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

总结一下死信消息的生命周期:

  1. 业务消息被投入业务队列

  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作

  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中

  4. 死信交换机将消息投入相应的死信队列

  5. 死信队列的消费者消费死信消息

死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/977222.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自然语言处理实战项目17-基于多种NLP模型的诈骗电话识别方法研究与应用实战

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下自然语言处理实战项目17-基于NLP模型的诈骗电话识别方法研究与应用&#xff0c;相信最近小伙伴都都看过《孤注一掷》这部写实的诈骗电影吧&#xff0c;电影主要围绕跨境网络诈骗展开&#xff0c;电影取材自上万起真…

PPO代码研究(2)

好&#xff0c; 因为我没怎么看懂&#xff0c; 所以我决定再看一遍PPO的代码&#xff0c; 再研究一遍。 事实证明&#xff0c; 重复是一个非常好&#xff0c;非常好的方法。 学习方法。 世界上几乎没有任何新知识是你一遍就能学会的。 你只能学一遍&#xff0c;再来一遍&…

大剧院订座系统源码,大剧院订票,大剧院场馆租赁,大剧院订票系统完整源码

大剧院订座系统源码,大剧院订票&#xff0c;大剧院场馆租赁&#xff0c;大剧院订票系统完整源码 大剧院系统1、管理后台--系统说明2、订票小程序--系统说明3、验票端--系统说明4、系统源码说明 大剧院系统 1、管理后台–系统说明 项目管理&#xff1a;用于创建剧院演出项目 2…

【广州华锐互动】AR技术在配电系统运维中的应用

随着科技的不断发展&#xff0c;AR(增强现实)技术逐渐走进了我们的生活。在电力行业&#xff0c;AR技术的应用也为巡检工作带来了许多新突破&#xff0c;提高了巡检效率和安全性。本文将从以下几个方面探讨AR配电系统运维系统的新突破。 首先&#xff0c;AR技术可以实现虚拟巡检…

Qt应用开发(基础篇)——按钮基类 QAbstractButton

一、前言 QAbstractButton类&#xff0c;继承于QWidget&#xff0c;是Qt按钮小部件的抽象基类&#xff0c;提供按钮常用的功能。 QAbstractButton按钮基类&#xff0c;它的子类(pushbutton、checkbox、toolbutton等)处理用户操作&#xff0c;并指定按钮的绘制方式。QAbstractBu…

el-table中加图标文字提示

<el-table :data"tableData" style"width: 100%" max-height"250"><el-table-column fixed prop"aaa" label"日期" width"150" /><el-table-column prop"bbb" label"日期" wi…

英语语法基础--思维导图

思维导图通常用于可视化和整理信息&#xff0c;而英文语法非常广泛且复杂&#xff0c;无法在一个简单的思维导图中完整表示。然而&#xff0c;我可以提供一个简化版本的英文语法思维导图&#xff0c;列出一些主要的语法概念和部分示例。 请注意&#xff0c;这只是一个基本的概…

多个pdf怎么合并在一起?跟着我的步骤一起合并

多个pdf怎么合并在一起&#xff1f;利用PDF文档合并功能可以帮助您更有效地管理文件&#xff0c;将多个相关文件整合成一个文件&#xff0c;避免分散在多个文件中。此外&#xff0c;合并后的文件更便于共享和传输&#xff0c;因为只需共享一个文件而不是多个文件。虽然合并文件…

自学Python01-创建文件写入内容

此处省去安装和前言&#xff0c;需要两个东西 一个去下载安装python官方库 Welcome to Python.org 一个是编译器pycharm PyCharm 安装教程&#xff08;Windows&#xff09; | 菜鸟教程 PyCharm: the Python IDE for Professional Developers by JetBrains 第一节 练习print…

18--Elasticsearch

一 Elasticsearch介绍 1 全文检索 Elasticsearch是一个全文检索服务器 全文检索是一种非结构化数据的搜索方式 结构化数据&#xff1a;指具有固定格式固定长度的数据&#xff0c;如数据库中的字段。 非结构化数据&#xff1a;指格式和长度不固定的数据&#xff0c;如电商网站…

rocky(centos) 安装redis,并设置开机自启动

一、下载并安装 1、官网下载Redis 并安装 Download | RedisRedisYou can download the last Redis source files here. For additional options, see the Redis downloads section below.Stable (7.2)Redis 7.2 …https://redis.io/download/ 2、上传下载好的redis压缩包到 /…

电气工程中重要的测量术语:“kVRMS” | 百能云芯

在电气工程和电子领域&#xff0c;术语“kVRMS”至关重要。它是工程师和技术人员用来准确评估电气系统电压的关键测量方法。在这篇综合文章中&#xff0c;我们将深入探讨 kVRMS 的含义、其意义、应用。 kVRMS 代表“千伏均方根”。为了理解这个术语&#xff0c;我们来分解一下&…

【Java Web】统一处理异常

一个异常处理的ControllerAdvice类。它用于处理Controller注解的控制器中发生的异常。 具体代码功能如下&#xff1a; 导入相关类和方法。声明一个Logger对象&#xff0c;用于日志记录。使用ExceptionHandler注解标记handleException方法&#xff0c;用于处理所有异常。 -嘛在…

管网水位监测的必要性

城市燃气、桥梁、供水、排水、热力、电力、电梯、通信、轨道交通、综合管廊、输油管线等&#xff0c;担负着城市的信息传递、能源输送、排涝减灾等重要任务&#xff0c;是维系城市正常运行、满足群众生产生活需要的重要基础设施&#xff0c;是城市的生命线。基础设施生命线就像…

centos+jenkins+pycharm

思路&#xff1a;架构 一. 在centos上搭建jenkins环境 二. pycharm与gitee建立连接 三. 访问jenkins&#xff0c;添加任务 3.1 添加一个自由风格的任务 3.2 添加git项目路径及访问git的账号和密码 3.3 执行start.sh脚本 四. 浏览器访问jenkins执行任务

leetcode-779. 第K个语法符号(java)

第K个语法符号 题目描述递归代码演示 题目描述 难度 - 中等 LC- 779. 第K个语法符号 我们构建了一个包含 n 行( 索引从 1 开始 )的表。首先在第一行我们写上一个 0。接下来的每一行&#xff0c;将前一行中的0替换为01&#xff0c;1替换为10。 例如&#xff0c;对于 n 3 &#…

个人博客系统-测试用例+自动化测试

一、个人博客系统测试用例 二、自动化测试 使用selenium4 Junit5单元测试框架&#xff0c;来进行简单的自动化测试。 1. 准备工作 &#xff08;1&#xff09;引入依赖&#xff0c;此时的pom.xml文件&#xff1a; <?xml version"1.0" encoding"UTF-8&quo…

在Mac电脑的终端程序中打开进入指定的系统/文件目录

例如&#xff1a;想直接在终端中打开repository目录&#xff0c;可以使用open 使用后可以看到打开了文件目录&#xff1a;

敏捷开发:适应变化的核心能力

​在当今高度变化的时代&#xff0c;软件开发的环境和要求也在不断变化。传统的开发方法往往难以适应这种快速变化&#xff0c;因此&#xff0c;一种新的软件开发方法——敏捷开发逐渐得到了广泛的关注和应用。 本文将介绍敏捷开发的概念、优势、实践经验、敏捷开发工具以及注…

三相三线电表和三相四线电表有什么区别

三相三线电表和三相四线电表是两种常见的电能计量仪表&#xff0c;它们在结构、接线方式和使用范围上有所不同。本文将从以下几个方面详细介绍两者之间的区别。 一、结构上的区别 1.三相三线电表&#xff1a;三相三线电表主要由电压线圈、电流线圈、转子、铝盘和外壳等部分组成…