SpringBoot整合RabbitMQ实现死信队列

news2024/11/25 13:03:03

文章目录

  • 概念介绍
    • 什么是死信
    • 死信队列应用
  • 工程搭建
    • 环境说明
    • 搭建步骤
  • 实现死信
    • 准备Exchange&Queue
    • 监听死信队列
    • 方式一——消费者拒绝&否认
    • 方式二——超过消息TTL
    • 方式三——超过队列长度限制
  • 代码仓库

前面一文通过 Java整合RabbitMQ实现生产消费(7种通讯方式),本文基于SpringBoot实现RabbitMQ中的死信队列和延迟队列。

概念介绍

什么是死信

死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:

  1. 消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。
  2. 消息到达生存时间还未被消费。
  3. 队列超过长度限制,消息被丢弃。

这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图
在这里插入图片描述

死信队列应用

  • 作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。
  • 可以实现延迟消费功能。比如,订单15分钟内未支付。

注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:

  1. 维护多个队列,每个队列维护一个TTL时间。
  2. 使用延迟交换机。这种方式需要下载插件支持,参考链接:RabbitMQ插件

工程搭建

环境说明

  • RabbitMQ环境,参考RabbitMQ环境搭建
  • Java版本:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

搭建步骤

  1. 创建SpringBoot项目。
  2. pom.xml文件导入RabbitMQ依赖。
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. application.yml文件添加RabbitMQ配置。
spring:
  # rabbitmq配置信息 RabbitProperties类
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启confirm机制
    publisher-confirm-type: correlated
    # 开启return机制
    publisher-returns: true
    #全局配置,局部配置存在就以局部为准
    listener:
      simple:
        acknowledge-mode: manual # 手动ACK

实现死信

准备Exchange&Queue

@Configuration
public class RabbitMQConfig {

    /**
     * 正常队列
     */
    public static final String EXCHANGE = "boot-exchange";

    public static final String QUEUE = "boot-queue";

    public static final String ROUTING_KEY = "boot-rout";

    /**
     * 死信队列
     */
    public static final String DEAD_EXCHANGE = "dead-exchange";

    public static final String DEAD_QUEUE = "dead-queue";

    public static final String DEAD_ROUTING_KEY = "dead-rout";

    /**
     * 声明死信交换机
     *
     * @return
     */
    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
    }

    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }


    /**
     * 绑定死信的队列和交换机
     *
     * @param deadExchange
     * @param deadQueue
     * @return
     */
    @Bean
    public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

    /**
     * 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
     *
     * @return
     */
    @Bean
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE).build();
    }

    /**
     * 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
     * 绑定死信交换机及路由key
     *
     * @return
     */
    @Bean
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                //.ttl(10000)
                //队列最大长度
                .maxLength(1)
                .build();
    }

    /**
     * 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
     *
     * @param bootExchange
     * @param bootQueue
     * @return
     */
    @Bean
    public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }

}

监听死信队列

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
    public void listener_dead(String msg, Channel channel, Message message) throws IOException {
        System.out.println("死信接收到消息" + msg);
        System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
        System.out.println("messageID:" + message.getMessageProperties().getMessageId());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

方式一——消费者拒绝&否认

  • 拒绝消息
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void listener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息" + msg);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
    }
  • 否认消息
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void listener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息" + msg);
 		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }

方式二——超过消息TTL

  • 发送消息时设置TTL
@SpringBootTest
public class Publisher {

    @Autowired
    private RabbitTemplate template;
        /**
     * 5秒未被消费会路由到死信队列
     */
    @Test
    public void publish_expir() {
        template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
            message.getMessageProperties().setExpiration("5000");
            return message;
        });
    }
}
  • 设置队列所有消息的TTL
    更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改
    @Bean
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                //给队列设置消息时长
                .ttl(10000)
                .build();
    }

方式三——超过队列长度限制

设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。

    @Bean
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                //声明队列属性有更改时需要删除队列
                .maxLength(1)
                .build();
    }

代码仓库

点我

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90094.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spark的运行模式介绍

Spark的运行模式 本地模式&#xff08;Local&#xff09; 一般用做测试&#xff0c;测试代码的逻辑是否正确 本地模式&#xff0c;只启动一个Driver进程&#xff0c;没有Executor进程的&#xff0c;所有Task都运行在Driver进程中 集群模式 &#xff08;Cluster&#xff09; 一…

医疗挂号网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 管理员功能&#xff1a; 1、管理挂号须知、帮助信息 2、增删改查资讯类型、健康资讯信息 3、增删改查医生职称信息、医生…

装载问题 ——回溯法(Java)

装载问题 ——回溯法&#xff08;Java&#xff09; 文章目录装载问题 ——回溯法&#xff08;Java&#xff09;1、 问题描述1.1 装载问题1.2 转换问题2、算法设计2.1 可行性约束函数2.2 上界函数2.3 解空间树2.4 剪枝函数2.5 算法设计3、程序代码4、参考资料1、 问题描述 有一…

Hadoop安装准备

虚拟机的安装 配置了静态IP地址&#xff08;192.168.1.100&#xff09; 关闭与禁用了防火墙 安装了vim编辑器 虚拟机克隆 克隆出master虚拟机 以同样的步骤克隆出slave1和slave2 虚拟机配置 配置master虚拟机 启动虚拟机 设置主机名 命令&#xff1a;hostname…

C#语言和面向对象OOP

1、【重点面试题】面向对象的三大特性 封装 &#xff1a;隐藏对象的属性&#xff0c;并实现细节&#xff08;方法&#xff09;&#xff0c;对外提供接口&#xff0c; public全局&#xff0c;protected子类&#xff0c;internal同集&#xff0c;隐藏private 同类&#xff0c;pub…

英文文章写作|文献管理|​​​​​​​阅读文献|引用文献|国内文章

目录 英文文章写作 1.阅读10篇文献&#xff0c;总结100个常用句型和常用短语 2.找3-5篇技术路线和统计方法与你的课题接近的文章&#xff0c;精读 3.针对论文的每一部分&#xff0c;尤其是某种具体方法、要讨论的某一具体方面&#xff0c;各找5-8 篇文献阅读&#xff0c;充…

使用HTMLTestRunner.py生成测试报告

1、如何收集测试结果&#xff1f; 使用第三方封装好类HTMLTestRunner.py生成HTML测试报告 # encoding:utf-8 import unittest import time from HTMLTestRunner import HTMLTestRunner class MyTestCase(unittest.TestCase): # 每条用例初始化 def setUp(s…

大数据技术之Spark基础解析

大数据技术之Spark基础解析 第1章 Spark概述 1.1什么是Spark 什么是Spark? 大数据的电花火石。 Spark类似于MapReduce的低延迟的交互式计算框架。 Spark是UC Berkeley AMPLab开发的是一种计算框架&#xff0c;分布式资源工作交由集群管理软件&#xff08;Mesos、YARN&am…

Unity Addressables资源管理 打包路径设置

1.全局路径设置窗口的菜单位置 或 2.窗口界面 初始&#xff1a; Local&#xff1a;本地路径 Remote&#xff1a;远程路径 build指资源包生成位置 Load指资源包加载路径 BuildTarget:一个路径变量 Built-In 是内置默认的本地路径EditorHosted 则是编辑器和【托管服务】一起使…

零代码是什么?

上有国家层面的数字中国&#xff0c;数字经济的顶层规划&#xff0c;下有信息化飞速发展让组织、个人都深切体会到了信息技术带给生活的便利&#xff0c;如今信息化技术领域热度很高的低代码&#xff08;LowCode&#xff09;和零代码&#xff08;No-Code&#xff09;又开始进入…

太强了,GitHub白嫖的SpringCloud微服务进阶宝典,啃完吊打面试官

前言 自 2014 年起&#xff0c;微服务技术一直火热至今。随着越来越完善的微服务技术栈的发布&#xff0c;以及越来越多的微服务项目实际的落地和上线&#xff0c;使用 Java 技术栈的企业应该都在尝试或者已经落地了各自的微服务项目。同时&#xff0c;通过招聘网站的信息和每…

C# CallerMemberName,CallerFilePath,CallerLineNumber的使用

总目录 文章目录总目录前言一、作用二、使用1.案例三、使用场景总结前言 本文主要介绍CallerMemberName&#xff0c;CallerFilePath&#xff0c;CallerLineNumber的使用。 一、作用 本文将介绍的三个特性作用如下&#xff1a; CallerMemberName 允许获取方法调用方的方法或属…

基于STM32与PCA9685制作四足机器人(代码开源)

前言&#xff1a;本文为手把手教学基于STM32的四足机器人项目——JDY-31蓝牙控制&#xff0c;特别地&#xff0c;本次项目采用的是STM32作为MCU。四足机器人的支架为3D打印件&#xff0c;SG90舵机驱动机器人实现姿态运动。借助PCA9685舵机驱动板实现12路PWM波控制&#xff0c;更…

基于java+springboot+mybatis+vue+mysql的留守儿童爱心网站

项目介绍 随着留守儿童爱心管理的不断发展&#xff0c;留守儿童爱心网站在现实生活中的使用和普及&#xff0c;留守儿童爱心管理成为近年内出现的一个热门话题&#xff0c;并且能够成为大众广为认可和接受的行为和选择。设计留守儿童爱心网站的目的就是借助计算机让复杂的管理…

[附源码]Node.js计算机毕业设计二手图书回收销售网站Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

程序调试:日常经验总结(一)

程序调试&#xff1a;日常经验总结一&#xff1a;如何快速的去查询一个类甚至是一个jar包中的class文件&#xff1f;二&#xff1a;如何快速找到本地项目编译之后的字节码文件三&#xff1a;本地启动小实例绑定同一个端口时候发生的报错。一&#xff1a;如何快速的去查询一个类…

[附源码]Python计算机毕业设计防疫卫生资讯推荐系统Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

taro 兼容支付宝小程序和微信小程序<九>---判断是否是开发者工具/开发版/体验版/正式版/测试环境/正式环境

项目&#xff1a; taro3 vue3 判断是支付宝/微信/H5 支付宝 -> process.env.TARO_ENV ‘alipay’ 微信 -> process.env.TARO_ENV ‘weapp’ H5 -> process.env.TARO_ENV ‘h5’ 判断是否是开发者工具 支付宝 -> my.isIDE 微信 -> Taro.getSystemInfoSyn…

[附源码]Python计算机毕业设计-高校科研信息管理系统Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

求最小生成树Prim(普里姆)和Kruskal(克鲁斯卡尔)算法

想求最小生成树&#xff0c;我们首先得弄懂以下几个概念 连通图:图中任意两个顶点都是连通的 极小连通子图:既要保持图连通又要使得边数最少的子图 生成树: 包含图中全部顶点的一个极小连通子图 连通图用通俗的话来讲就是&#xff0c;某一个顶点&#xff0c;可以直接或者间接…