RabbitMQ实现死信队列

news2025/1/8 12:08:52

目录

  • 死信队列是什么
  • 怎样实现一个死信队列
    • 说明
    • 实现过程
      • 导入依赖
      • 添加配置
      • 编写mq配置类
      • 添加业务队列的消费者
      • 添加死信队列的消费者
      • 添加消息发送者
      • 添加消息测试类
      • 测试
  • 死信队列的应用场景
  • 总结

死信队列是什么

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的TTL时间。
  • 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

怎样实现一个死信队列

说明

配置死信队列大概可以分为三个步骤:

1.配置业务队列,绑定到业务交换机上

2.为业务队列配置死信交换机和路由key

3.为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

实现过程

导入依赖

        <!--RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

添加配置

spring:      
  #rabbitmq
  rabbitmq:
    host: 83.136.16.134
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual

编写mq配置类

代码里面有详细说明,这里不在赘述。

package com.miaosha.study.mq;

import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:16
 * @Version: 1.0
 */
@Configuration
public class RabbitmqConfig {


    /**
     * 业务交换机
     */
    public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
    /**
     * 业务队列a
     */
    public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";
    /**
     * 业务交换机b
     */
    public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";
    /**
     * 死信交换机
     */
    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
    /**
     * 死信队列a
     */
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";
    /**
     * 死信队列b
     */
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";
    /**
     * 死信队列路由键a
     */
    public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";
    /**
     * 死信队列路由键b
     */
    public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";


    /**
     * 申明业务交换机
     * @return
     */
    @Bean
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    /**
     * 申明死信交换机
     * @return
     */
    @Bean
    public DirectExchange deadletterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }

    /**
     * 申明业务队列a
     * @return
     */
    @Bean
    public Queue queuea(){
        Map<String,Object> map = new HashMap<>();
        //绑定死信交换机
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        //绑定的死信路由键
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();
    }

    /**
     * 申明业务队列b
     * @return
     */
    @Bean
    public Queue queueb(){
        Map<String,Object> map = new HashMap<>();
        //绑定死信交换机
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        //绑定的死信路由键
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();
    }

    /**
     * 申明死信队列a
     * @return
     */
    @Bean
    public Queue deadletterQueuea(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    /**
     * 申明死信队列b
     * @return
     */
    @Bean
    public Queue deadletterQueueb(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    /**
     * 队列a绑定到业务交换机
     * @return
     */
    @Bean
    public Binding businessBindinga(){
        return BindingBuilder.bind(queuea()).to(businessExchange());
    }


    /**
     * 队列b绑定到业务交换机
     * @return
     */
    @Bean
    public Binding businessBindingb(){
        return BindingBuilder.bind(queueb()).to(businessExchange());
    }

    /**
     * 死信队列a绑定到死信交换机
     * @return
     */
    @Bean
    public Binding deadletterBindinga(){
        return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);
    }

    /**
     * 死信队列b绑定到死信交换机
     * @return
     */
    @Bean
    public Binding deadletterBindingB(){
        return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);
    }
}

添加业务队列的消费者

package com.miaosha.study.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:53
 * @Version: 1.0
 */
@Slf4j
@Component
public class RabbitmqReceiver {

    /**
     * 监听业务队列a
     * @param message
     */
    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void queuea(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("业务队列A接受到消息【{}】",msg);

        boolean ack = true;
        Exception exception = null;
        try {
            //这里模拟业务逻辑出现异常的情况
            if (msg.contains("fail")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }

        //当ack为false时(业务逻辑出现异常),说明当前消息消费异常,这里直接放入死信队列
        if (!ack){
            log.error("业务队列A消费发生异常,error msg:{}", exception.getMessage());
            /**
             * void basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * 参数一:当前消息的唯一id
             * 参数二:是否针对多条消息
             * 参数三:是否从新入队列
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    /**
     * 监听业务队列b
     * @param msg
     */
    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void queueb(Message msg,Channel channel) throws Exception{
        String str = new String(msg.getBody());
        log.info("业务队列B接受到消息【{}】",str);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    }
}

添加死信队列的消费者

package com.miaosha.study.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

import static com.miaosha.study.mq.RabbitmqConfig.*;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:58
 * @Version: 1.0
 */
@Slf4j
@Component
public class DeadLetterReceiver {

    /**
     * 监听业务队列a
     * @param msg
     */
    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void queuea(Message msg, Channel channel) throws IOException {
        String str = new String(msg.getBody());
        log.info("死信队列A接受到消息【{}】",str);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        log.info("死信消息properties:{}", msg.getMessageProperties());
    }

    /**
     * 监听业务队列b
     * @param msg
     */
    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void queueb(Message msg, Channel channel) throws IOException {
        String str = new String(msg.getBody());
        log.info("死信队列B接受到消息【{}】",str);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        log.info("死信消息properties:{}", msg.getMessageProperties());
    }
}

添加消息发送者

package com.miaosha.study.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:49
 * @Version: 1.0
 */
@Component
public class RabbitmqSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMsg(String msg){
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);
    }
}

添加消息测试类

package com.miaosha.study.controller;

import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: laz
 * @CreateTime: 2023-02-27  09:59
 * @Version: 1.0
 */
@RestController
@RequestMapping("mq")
public class TestController {

    @Autowired
    private RabbitmqSender rabbitmqSender;


    @RequestMapping("testDeadLetterQueue/{msg}")
    public void testDeadLetterQueue(@PathVariable("msg")String msg){
        rabbitmqSender.sendMsg(msg);
    }
}

测试

运行项目,访问:http://localhost:8081/mq/testDeadLetterQueue/msg

在这里插入图片描述

可以看到,此时只有业务消费者消费了消息,死信队列并没有消费到消息。

然后根据消费者里面的逻辑,我们发送一条 ‘fail’的消息,再次测试

访问:http://localhost:8081/mq/testDeadLetterQueue/fail

在这里插入图片描述

可以看到,死信队列a已收到消息。到此实现死信队列的流程就通了。

注意:我们的死信消息MessageProperties中的内容比较多,代表的含义分别是:

字段名含义
x-first-death-exchange第一次被抛入的死信交换机的名称
x-first-death-reason第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量
x-first-death-queue第一次成为死信前所在队列名称
x-death历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

死信队列的应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

死信消息的生命周期:

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

本篇文章到此结束!希望对您有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/374246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单调栈(C/C++)

目录 1. 单调栈的定义 2. 单调栈的常见用途 3. 案例分析 3.1 暴力解法 3.2 单调栈 4. 单调栈总结 1. 单调栈的定义 单调栈顾名思义&#xff0c;就是栈内的元素是单调的。根据栈内元素的单调性的不同&#xff0c;可以分为&#xff1a; 单调递增栈&#xff1a;栈内元素是单…

LeetCode 105. 从前序与中序遍历序列构造二叉树 106. 从中序与后序遍历序列构造二叉树

为什么前序和中序或者中序和后序&#xff0c;两两组合能构建一个二叉树&#xff1f; 因为前序和后序可以确定根&#xff0c;而中序可以划分出左右区间。 文章目录从前序与中序遍历序列构造二叉树从中序与后序遍历序列构造二叉树从前序与中序遍历序列构造二叉树 难度 中等 题目链…

基于java的进销库存管理系统(Vue+Springboot+Mysql)前后端分离项目,附万字课设论文

1.3 系统实现的功能 本次设计任务是要设计一个超市进销存系统&#xff0c;通过这个系统能够满足超市进销存系统的管理及员工的超市进销存管理功能。系统的主要功能包括&#xff1a;首页、个人中心、员工管理、客户管理、供应商管理、承运商管理、仓库信息管理、商品类别管理、 …

TS泛型,原来就这?

一、泛型是什么&#xff1f;有什么作用&#xff1f; 当我们定义一个变量不确定类型的时候有两种解决方式&#xff1a; 使用any 使用any定义时存在的问题&#xff1a;虽然知道传入值的类型但是无法获取函数返回值的类型&#xff1b;另外也失去了ts类型保护的优势 使用泛型 泛型…

记一次线上es慢查询导致的服务不可用

现象 某日线上业务同学反馈订单列表查询页面一直loding&#xff0c;然后提示请求超时&#xff0c;几分钟之后恢复正常 接到报障之后&#xff0c;马上根据接口URL&#xff0c;定位到了请求链路&#xff0c;发现是es查询超时&#xff0c;这里我们的业务订单表数据是由几百万的&a…

【数据结构】时间复杂度和空间复杂度以及相关OJ题的详解分析

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录1.算法效率1.1 如何衡…

独家 | Gen-1——可以改变视频风格的AI模型

翻译&#xff1a;吴振东校对&#xff1a;张睿毅本文约1000字&#xff0c;建议阅读3分钟 本文简单介绍了Runway公司的发展史&#xff0c;以及他们新推出的生成式AI模型Gen-1&#xff0c;可用于通过应用文本提示或者参考图像所指定的任意风格&#xff0c;将现有视频转换为新视频。…

php mysql高校教材管理系统

我的目标就是在于开发一个功能实用、操作方便&#xff0c;简单明了的管理系统&#xff1b;其能够录入教师个人的信息&#xff0c;教导主任信息&#xff0c;在操作上能够完成诸如添加、修改、删除、按各种条件进行查询、等方面的工作&#xff0c;基本满足学校的日常业务的需求. …

System V|共享内存基本通信框架搭建|【超详细的代码解释和注释】

前言 那么这里博主先安利一下一些干货满满的专栏啦&#xff01; 手撕数据结构https://blog.csdn.net/yu_cblog/category_11490888.html?spm1001.2014.3001.5482这里包含了博主很多的数据结构学习上的总结&#xff0c;每一篇都是超级用心编写的&#xff0c;有兴趣的伙伴们都支…

string和自动推断类型

欢迎来观看温柔了岁月.c的博客目前设有C学习专栏C语言项目专栏数据结构与算法专栏目前主要更新C学习专栏&#xff0c;C语言项目专栏不定时更新待C专栏完毕&#xff0c;会陆续更新C项目专栏和数据结构与算法专栏一周主要三更&#xff0c;星期三&#xff0c;星期五&#xff0c;星…

【项目管理】项目进度管理中的逻辑关系

项目的进度管理是项目核心管理之一&#xff0c;通过合理的进度安排&#xff0c;制定出科学可行的分项工期表&#xff0c;并条理清晰的显示出项目进度之间的逻辑关系。 1、目标是计划的灵魂 进度计划必须按照确定的项目总进度要求进行编制&#xff0c;了解项目总目标和整体安…

网络安全——数据链路层安全协议(2)

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.局域网数据链路层安全协议 1.IEEE 802.10 &#xff08;1&#xff09;IEE…

JavaWeb HTTP和Maven

4、Http 4.1、什么是HTTP HTTP&#xff08;超文本传输协议&#xff09;是一个简单的请求-响应协议&#xff0c;它通常运行在TCP之上。 文本&#xff1a;html&#xff0c;字符串&#xff0c;~ ….超文本&#xff1a;图片&#xff0c;音乐&#xff0c;视频&#xff0c;定位&am…

登峰造极,师出造化,Pytorch人工智能AI图像增强框架ControlNet绘画实践,基于Python3.10

人工智能太疯狂&#xff0c;传统劳动力和内容创作平台被AI枪毙&#xff0c;弃尸尘埃。并非空穴来风&#xff0c;也不是危言耸听&#xff0c;人工智能AI图像增强框架ControlNet正在疯狂地改写绘画艺术的发展进程&#xff0c;你问我绘画行业未来的样子&#xff1f;我只好指着Cont…

jdbc模板的基本使用

1.JdbcTemplate的开发步骤 <1>导入spring-jdbc和spring-tx坐标 <2>创建数据库表和实体 <3>创建JdbcTemplate对象 <4>执行数据库 2.JdbcTemplate快速入门 <1>导入坐标 <dependency><groupId>org.springframework</groupId><…

【Python学习笔记】第十七节 Python 异常处理

Python 异常在任何一种编程语言中&#xff0c;都会有异常处理机制&#xff0c;python也不例外&#xff0c;它提供了两个非常重要的功能来处理python程序在运行中出现的异常和错误。Python 异常处理异常的概念&#xff1a;在程序运行过程中&#xff0c;由于代码错误或者运行环境…

Java数据结构与算法——手撕LRULFU算法

LRU算法 力扣146&#xff1a;https://leetcode-cn.com/problems/lru-cache/ 讲解视频&#xff1a;https://www.bilibili.com/video/BV1Hy4y1B78T?p65&vd_source6f347f8ae76e7f507cf6d661537966e8 LRU是Least Recently Used的缩写&#xff0c;是一种常用的页面置换算法&…

Typora图床配置:Typora + PicGo + 阿里云OSS

文章目录一、前景提要二、相关链接三、搭建步骤1. 购买阿里云对象存储OSS2. 对象存储OSS&#xff1a;创建Bucket3. 阿里云&#xff1a;添加OSS访问用户及权限4. 安装Typora5. 配置PicGo方法一&#xff1a;使用PicGo-Core (Command line)方法二&#xff1a;使用PicGo(app)6. 最后…

C语言深度剖析:关键字

C语言深度剖析:关键字C语言深度剖析:关键字前言定义与声明&#xff08;补充内容&#xff09;最宏大的关键字-auto最快的关键字-register关键字static被冤枉的关键字-sizeof整型在内存中的存储原码、反码、补码大小端补充理解变量内容的存储和取出为什么都是补码整型取值范围关于…

多线程的初识和创建

✨个人主页&#xff1a;bit me&#x1f447; ✨当前专栏&#xff1a;Java EE初阶&#x1f447; ✨每日一语&#xff1a;知不足而奋进&#xff0c;望远山而前行。 目 录&#x1f4a4;一. 认识线程&#xff08;Thread&#xff09;&#x1f34e;1. 线程的引入&#x1f34f;2. 线程…