spring boot + rabbitMq整合之死信队列(DL)

news2025/1/12 18:59:43

rabbit mq 死信队列

什么是死信队列?

DL-Dead Letter 死信队列

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。通常来说,如果consumer如果消费出现异常,并且没有ack的话,也属于这种情况
  2. 消息在队列的存活时间超过设置的生存时间(TTL)时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

流程

在这里插入图片描述

publisher端

下面是生产者需要做的

代码实现

  1. 创建message 相关
@Configuration
public class DXLOrderMessageConfig {

    public static final String DXL_ORDER_QUEUE = "dxl-order-queue";
    public static final String DXL_ORDER_EXCHANGE = "dxl-order-exchange";

    public static final String DXL_ORDER_KEY = "dxl-order-routingKey";

    public static final String ORDER_QUEUE = "order-queue";

    public static final String ORDER_ROUTING_KEY = "order-routingKey";

    public static final String ORDER_EXCHANGE = "order-exchange";

    @Bean
    Queue DXLOrderQueue() {
        return new Queue(DXL_ORDER_QUEUE, true, false, false);
    }

    @Bean
    DirectExchange DXLOrderExchange() {
        return new DirectExchange(DXL_ORDER_EXCHANGE, true, false);
    }

    @Bean
    Binding dxlBinding() {

        return BindingBuilder.bind(DXLOrderQueue()).to(DXLOrderExchange()).with(DXL_ORDER_KEY);
    }

    @Bean
    DirectExchange OrderExchange() {
        Map<String, Object> deadLetterParams = new HashMap<>(2);
        return new DirectExchange(ORDER_EXCHANGE, true, false, deadLetterParams);
    }

    @Bean
    Queue OrderQueue() {
        Map<String, Object> args = new HashMap<>(1);
        // ms,指定对应的死信队列配置,和该消息的有效期限,这里设置的是60分钟
        args.put("x-message-ttl", 60000);
        //        x-max-length:队列最大容纳消息条数,大于该值,mq拒绝接受消息,消息进入死信队列
		//        args.put("x-max-length", 5);
        args.put("x-dead-letter-exchange", DXL_ORDER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DXL_ORDER_KEY);
        return new Queue(ORDER_QUEUE, true, false, false, args);
    }


    @Bean
    Binding bindingOrder() {
        return BindingBuilder.bind(OrderQueue()).to(OrderExchange()).with(ORDER_ROUTING_KEY);
    }
  1. 发送正常的mq,为了支持自定义的测试case,增加了自定义的orderNo
 @GetMapping("/create-order/{orderNo}")
    public String createOrder(@PathVariable("orderNo") String orderNo) {
        // 发送持久化mq
        Map<String, Object> orderMessage = getMqMessage();
        if (StringUtils.isEmpty(orderNo)) {
            orderNo = "order-" + RandomUtils.nextInt(1, 100);
        }
        orderMessage.put("orderNo", orderNo);
        rabbitTemplate.convertAndSend(DXLOrderMessageConfig.ORDER_EXCHANGE, DXLOrderMessageConfig.ORDER_ROUTING_KEY, orderMessage);

        return "ok";
    }

调用url: http://localhost:9876/rabbit-mq-sender/create-order/xxxxx即可测试

测试

我们发送一个正常的订单message
在这里插入图片描述
在一分钟后,如果不被消费的话,那么就会自动进入binding的DL中
message自动转入DL中

消费端

在消费端,通常我们需要两个消费者,一个正常的业务消费者,一个dlx的消费者,我们首先定义一个消费异常的cosumer,那么消息在异常之后会进入dead letter

代码实现

业务消费

正常的业务代码-OrderChannelListener.java,测试使用

@Service
public class OrderChannelListener extends Loggable implements ChannelAwareMessageListener {
    @RabbitListener(queues = "order-queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            business(message);
            // 手动签收
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //拒绝消费消息(丢失消息) 给死信队列
            channel.basicReject(deliveryTag, false);
        }

    }

    private void business(Message orderMessage) {
        String body = new String(orderMessage.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> messageInMap = JsonUtils.toMap(body);
        String orderNo = (String) messageInMap.get("orderNo");
        logger.info("DirectReceiver[OrderQueue]正常订单消费者收到消息  : " + orderNo);
        if (orderNo != null && orderNo.contains("404")) {
            throw new RuntimeException("illegal order no");
        }
    }
}
dead letter消费者

死信队列的消费 DLXOrderChannelListener.class

@Service
public class DLXOrderChannelListener extends Loggable implements ChannelAwareMessageListener {
    @RabbitListener(queues = "dlx-order-queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        logger.info("订单dead letter queue[dead letter queue]订单消费者收到消息  : " + s);
        logger.info("开始处理超时的订单.....");
        logger.info("结束处理超时的订单.....");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

我们调用http://localhost:9876/rabbit-mq-sender/create-order/404,我们期待,在正常消费的时候,发生异常,然后会调到死信队列的消费者中去了,我们看日志

在这里插入图片描述
和预测的一模一样。

实际应用

我们都知道,下单过程中,一般会有半个小时的支付时间,这个时间段,订单会占用库存,那么如果过了这个时间段不支付的话,我们会取消该订单并且释放库存。为了实现这个目的,通常我们会有两个问题

  1. 后台任务轮询
  2. mq的死信队列,利用mq信息的ttl,来驱动

流程图如下
在这里插入图片描述
从流程图上我们可以看出,我们只需要一个死信队列的消费者就好了,我们使用mq的有效期,ttl去实现定时的mq到死信队列的状态

代码实现
服务器端
@GetMapping("/save-order/{orderNo}")
    public String simulateCreateOrder(@PathVariable("orderNo") String orderNo) {
        try {
            insertOrderTable();
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        // 发送mq去检测延时检测状态
        sendCheckMq(orderNo);
        return "ok";
    }

    private void insertOrderTable() throws InterruptedException {
        logger.info("start to create order ....");
        TimeUnit.SECONDS.sleep(1);
        logger.info("inserted to order table and occupy the stock");
    }

    private void sendCheckMq(String orderNo) {
        // 发送持久化mq
        Map<String, Object> orderMessage = getMqMessage();
        if (StringUtils.isEmpty(orderNo)) {
            orderNo = "order-" + RandomUtils.nextInt(1, 100);
        }
        orderMessage.put("orderNo", orderNo);
        rabbitTemplate.convertAndSend(DXLOrderMessageConfig.ORDER_EXCHANGE, DXLOrderMessageConfig.ORDER_ROUTING_KEY, orderMessage);
    }

死信队列端

死信端和上述的dead letter代码没啥区别,但是业务的消费者需要注释掉,只能通过ttl来触发正常的mq跳转到死信队列,参考DLXOrderChannelListener.class

代码参考

https://github.com/GitHubsteven/spring-in-action2.0/tree/master/spring-boot-mesage

参考文档

  1. Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example | JavaInUse
  2. RabbitMQ死信队列实战——解决订单超时未支付

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384696.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt 解决程序全屏运行弹窗引发任务栏显示

文章目录摘要在VM虚拟机器中测试setWindowFlags()关键字&#xff1a; Qt、 Qt::WindowStayOnTopHint、 setWindowFlags、 Qt::Window、 Qt::Tool摘要 今天眼看项目就要交付了&#xff0c;结果在测试程序的时候&#xff0c;发现在程序全品情况下&#xff0c;点击输入框&#x…

【Android Studio】【学习笔记】【2023春】

文章目录零、常用一、界面布局疑问&报错零、常用 一、界面布局 Android——六大基本布局总结/CSDN小马 同学 【Android】线性布局&#xff08;LinearLayout&#xff09;最全解析/CSDNTeacher.Hu 一个不错的计算器界面&#x1f447; Android Studio App LinearLayout多层…

数据资产管理建设思考(二)

关于数据资产管理&#xff0c;近两年是数据治理行业中一个热点话题&#xff0c;当然有我们前面提到的国家的政策支持及方向指引的原因。另一方面我们做数据治理的同行们从学习吸收国外优秀的数据治理理论&#xff0c;进一步在实践中思考如何应用理论&#xff0c;并结合我们国家…

docker(二)镜像详解、镜像构建、镜像优化

文章目录前言一、docker镜像详解1.镜像分层结构2.镜像的表示二、镜像构建1.commit提交2.DockerfileDockerfile 命令详解三、镜像优化1.缩减镜像层2.多阶段构建3.使用最精简的基础镜像前言 一、docker镜像详解 1.镜像分层结构 共享宿主机的kernelbase镜像提供的是最小的Linux发…

【LeetCode】1487. 保证文件名唯一

1487. 保证文件名唯一 题目描述 给你一个长度为 n 的字符串数组 names 。你将会在文件系统中创建 n 个文件夹&#xff1a;在第 i 分钟&#xff0c;新建名为 names[i] 的文件夹。 由于两个文件 不能 共享相同的文件名&#xff0c;因此如果新建文件夹使用的文件名已经被占用&a…

pytorch-模型训练中过拟合和欠拟合问题。从模型复杂度和数据集大小排查问题

评价了机器学习模型在训练数据集和测试数据集上的表现。如果你改变过实验中的模型结构或者超参数&#xff0c;你也许发现了&#xff1a;当模型在训练数据集上更准确时&#xff0c;它在测试数据集上却不一定更准确。这是为什么呢&#xff1f; 训练误差和泛化误差 在解释上述现象…

常用Swagger注解汇总

常用Swagger注解汇总 前言 在实际编写后端代码的过程中&#xff0c;我们可能经常使用到 swagger 注解&#xff0c;但是会用不代表了解&#xff0c;你知道每个注解都有什么属性吗&#xff1f;你都用过这些属性吗&#xff1f;了解它们的作用吗&#xff1f;本文在此带大家总结一下…

6-2 SpringCloud快速开发入门:声明式服务消费 Feign实现消费者

声明式服务消费 Feign实现消费者 使用 Feign实现消费者&#xff0c;我们通过下面步骤进行&#xff1a; 第一步&#xff1a;创建普通 Spring Boot工程 第二步&#xff1a;添加依赖 <dependencies><!--SpringCloud 集成 eureka 客户端的起步依赖--><dependency>…

图解LeetCode——剑指 Offer 34. 二叉树中和为某一值的路径

一、题目 给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。叶子节点 是指没有子节点的节点。 二、示例 2.1> 示例 1&#xff1a; 【输入】root [5,4,8,11,null,13,4,7,2,null,null,5,1], t…

从 ChatGPT 爆火回溯 NLP 技术

ChatGPT 火遍了全网&#xff0c;多个话题频频登上热搜。见证了自然语言处理&#xff08;NLP&#xff09;技术的重大突破&#xff0c;体验到通用技术的无限魅力。GPT 模型是一种 NLP 模型&#xff0c;使用多层变换器&#xff08;Transformer&#xff09;来预测下一个单词的概率分…

cuda编程以及GPU基本知识

目录CPU与GPU的基本知识CPU特点GPU特点GPU vs. CPU什么样的问题适合GPU&#xff1f;GPU编程CUDA编程并行计算的整体流程CUDA编程术语&#xff1a;硬件CUDA编程术语&#xff1a;内存模型CUDA编程术语&#xff1a;软件线程块&#xff08;Thread Block&#xff09;网格&#xff08…

新界面Moonbeam DApp上线,替你先尝试了一番!

作者&#xff1a;充电中的小恐龙 请注意&#xff0c;本篇内容来自Moonbeam社区成员的无偿分享&#xff0c;与Moonbeam官方和Moonbeam中文社区无关。本文内容仅供参考&#xff0c;对于内容的准确性和实效性&#xff0c;请自行谨慎判断。 本文撰写于DApp上线Beta版本之时&#…

数据库开发(一文概括mysql基本知识)

Mysql 是最流行的关系型数据库管理系统&#xff0c;在 WEB 应用方面 MySQL 是最好的 关系型数据库(Relational Database Management System&#xff1a;关系数据库管理系统)应用软件之一。mysql在问开发中&#xff0c;几乎必不可少&#xff0c;因为其他的可能是要收费的&#x…

【运维】Linux定时任务 定时执行脚本

【运维】Linux定时任务 定时执行脚本 在安装完成操作系统后&#xff0c;默认会安装 crond 服务工具&#xff0c;且 crond 服务默认就是自启动的。crond 进程每分钟会定期检查是否有要执行的任务&#xff0c;如果有&#xff0c;则会自动执行该任务。 五分钟执行一次sh脚本 进入编…

taobao.item.update.delisting( 商品下架 )

&#xffe5;开放平台基础API必须用户授权 单个商品下架输入的num_iid必须属于当前会话用户 公共参数 请求地址: HTTP地址 http://gw.api.taobao.com/router/rest 公共请求参数: 公共响应参数: 点击获取key和secret 请求参数 响应参数 请求示例 TaobaoClient client new …

6-3 SpringCloud快速开发入门: Feign实现负载均衡的服务消费

Feign实现负载均衡的服务消费 负载均衡&#xff1a;Spring Cloud 提供了 Ribbon来实现负载均衡&#xff0c;使用 Ribbo直接注入一个 RestTemplate对象即可&#xff0c;RestTemplate已经做好了负载均衡的配置&#xff1b; 在 Spring Cloud下&#xff0c;使用 Feign也是直接可以实…

实时的软件生成 —— Prompt 编程打通低代码的最后一公里?

PS&#xff1a;这也是一篇畅想&#xff0c;虽然经过了一番试验&#xff0c;依旧有一些不足&#xff0c;但是大体上站得住脚。传统的软件生成方式需要程序员编写大量的代码&#xff0c;然后进行测试、发布等一系列繁琐的流程。而实时生成技术则是借助人工智能技术&#xff0c;让…

在CANoe/CANalyzer中观察CAN Message报文的周期Cycle

案例背景&#xff1a; 该篇博文将告诉您&#xff0c;如何直观的&#xff0c;图示化的&#xff0c;查看CAN网络中各CAN Message报文的周期变化。 优质博文推荐阅读&#xff08;单击下方链接&#xff0c;即可跳转&#xff09;&#xff1a; Vector工具链 CAN Matrix DBC CAN M…

市场份额下降,股价暴跌,金山云要想实现盈利还需要更长的时间

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 IPO后股价表现与估值变化 金山云&#xff08;KC&#xff09;于2020年5月在纳斯达克进行了IPO上市&#xff0c;当时IPO价格为17美元。根据S&P Capital IQ的数据&#xff0c;截至2020年6月1日&#xff0c;金山云的股价已…

linux基本功系列之uname实战

文章目录前言一. uname命令介绍二. 语法格式及常用选项三. 参考案例3.1 输出全部信息3.2 输出内核名称及版本3.3 输出网络节点的主机名3.4 输出主机硬件架构3.5 输出操作系统名称3.6 显示版本信息总结前言 大家好&#xff0c;又见面了&#xff0c;我是沐风晓月&#xff0c;本文…