SpringBoot整合RabbitMQ实现消息延迟队列

news2025/1/12 1:59:12

环境依赖

SpringBoot 3.1.0

JDK 17

前期准备

安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

实例

实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

1.添加 RabbitMQ 的 Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置 RabbitMQ

在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
# 手动应答
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5

3.配置文件

@Configuration
public class RabbitMQOrderConfig {

    /**
     * 订单交换机
     */
    public static final String ORDER_EXCHANGE = "order_exchange";
    /**
     * 订单队列
     */
    public static final String ORDER_QUEUE = "order_queue";
    /**
     * 订单路由key
     */
    public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";

    /**
     * 死信交换机
     */
    public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
    /**
     * 死信队列 routingKey
     */
    public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";

    /**
     * 死信队列
     */
    public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";

    /**
     * 延迟时间 (单位:ms(毫秒))
     */
    public  static final Integer DELAY_TIME = 10000;


    /**
     * 创建死信交换机
     */
    @Bean("orderDeadLetterExchange")
    public Exchange orderDeadLetterExchange() {
        return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
    }

    /**
     * 创建死信队列
     */
    @Bean("orderDeadLetterQueue")
    public Queue orderDeadLetterQueue() {
        return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列
     */
    @Bean("orderDeadLetterBinding")
    public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
    }


    /**
     * 创建订单交换机
     */
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 创建订单队列
     */
    @Bean("orderQueue")
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);

        //过期时间,单位毫秒
        args.put("x-message-ttl", DELAY_TIME);

        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定订单交换机和队列
     */
    @Bean("orderBinding")
    public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
    }
}

4.定义消息实体类

定义一个消息体类,用来存储需要发送的消息:

@Slf4j
@Data
@Builder
public class OrderMessage implements Serializable {

    /**
     * 商户订单号
     */
    private String orderId;

    /**
     * 支付宝订单号
     */
    private String tradeNo;
}

5.定义消息发送者

定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

@Slf4j
@Component
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderMessage(OrderMessage message) {
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
        rabbitTemplate.setMandatory(true);
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
        rabbitTemplate.setReturnsCallback(returned -> {
            int code = returned.getReplyCode();
            System.out.println("code=" + code);
            System.out.println("returned=" + returned);
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);

        log.info("===============延时队列生产消息====================");
        log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);
    }
}

6.定义消息消费者

定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {

    @RabbitHandler
    public void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
        log.info("收到消息:{}",new Date());
        log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());
        log.info("message:{}", message);
        log.info("content:{}", orderMessage);
    }
}

这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

7.定义一个controller

@Slf4j
@Api(tags = "延迟消息接口")
@RestController
@RequestMapping("/rabbitmq_order_delay_message")
public class RabbitMQDelayMessageController {

    @Autowired
    private MessageSender sender;

    /**
     * 发送消息
     * @return
     */
    @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
    @ResponseBody
    public void sendMsg() {
        OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();
        sender.sendOrderMessage(orderMessage);
    }
}

启动项目,请求运行结果:

总的xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
    <groupId>com.xiaoleilu</groupId>
    <artifactId>hutool-all</artifactId>
    <version>3.0.7</version>
</dependency>

<dependency>
    <groupId>io.swagger</groupId>
    <artifactId>swagger-annotations</artifactId>
    <version>${swagger-annotations.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.73</version>
    <scope>compile</scope>
</dependency>

问题总结

1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

需要创建一个交换机

2.Connection refused: no further information

请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties

3.Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
这种情况:

1.消费者内部重复签收导致签收异常

​    解决方案:增加配置手动处理应答

        1.配置新增

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收

        2.代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    public void consumer(String body, Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("收到消息:" + new Date());
            System.out.println("msgTag=" + msgTag);
            System.out.println("message=" + message);
            System.out.println("body=" + body);
            channel.basicAck(msgTag, false);
        }catch (Exception e) {
            log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);
        } finally {
            // 处理完之后手动签收(这里再次签收)
            channel.basicAck(msgTag, false);
        }
    }

2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

​ 解决方案:去除channel.basicAck(msgTag, false)

4.Failed to convert message

消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1805594.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

期刊影响因子、分区如何查询

查询期刊影响因子、分区等信息就不得不说到的数据库JCI(Journal Citation Indicator)。 JCR 是一个综合性、多学科的期刊分析与评价报告&#xff0c;它客观地统计Web of Science收录期刊所刊载论文的数量、论文参考文献的数量、论文的被引用次数等原始数据&#xff0c;再应用文…

数据库(29)——子查询

概念 SQL语句中嵌套SELECT语句&#xff0c;称为嵌套查询&#xff0c;又称子查询。 SELECT * FROM t1 WHERE column1 (SELECT column1 FROM t2); 子查询外部语句可以是INSERT/UPDATE/DELETE/SELECT的任何一个。 标量子查询 子查询返回的结果是单个值&#xff08;数字&#xff…

基于xml的Spring应用(理解spring注入)

目录 问题&#xff1a; 传统Javaweb开发的困惑? 问题&#xff1a; IOC、DI和AOP的思想提出 问题&#xff1a; Spring框架的诞生 1. BeanFactory快速入门 2. ApplicationContext快速入门 3. BeanFactory和ApplicationContext的关系 基于xml的Spring应用 1. SpringBean的…

c# 开发的wpf程序闪退,无法用try catch捕获异常

之前开发的一个程序是c#wpf开发&#xff0c;基于.net framework 4.6.1的&#xff0c;一切都是正常的&#xff0c;但是在我重新装了win11后在程序logo出现后直接闪退&#xff0c;报错 返回值为 -1073740791 (0xc0000409)&#xff0c;而且定位到代码时发现是&#xff0c; publi…

AI助教时代:通义千问,让学习效率翻倍?

全文预计1100字左右&#xff0c;预计阅读需要5分钟。 关注AI的朋友知道&#xff0c;在今年5月份以及6月份的开端&#xff0c;AI行业可谓是风生水起&#xff0c;给了我们太多的惊喜和震撼&#xff01;国内外各家公司纷纷拿出自己憋了一年的产品一决雌雄。 国内有文心一言、通义千…

LeetCode 算法:最大子数组和c++

原题链接&#x1f517;&#xff1a;最大子数组和 难度&#xff1a;中等⭐️⭐️ 题目 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组是数组中的一个连续部分。 …

28-LINUX--I/O复用-epoll

一.epoll概述 epoll 是 Linux 特有的 I/O 复用函数。它在实现和使用上与 select、poll 有很大差异。首 先&#xff0c;epoll 使用一组函数来完成任务&#xff0c;而不是单个函数。其次&#xff0c;epoll 把用户关心的文件描述 符上的事件放在内核里的一个事件表中。从而无需像…

计算机组成原理之指令格式

1、指令的定义 零地址指令&#xff1a; 1、不需要操作数&#xff0c;如空操作、停机、关中断等指令。 2、堆栈计算机&#xff0c;两个操作数隐藏在栈顶和此栈顶&#xff0c;取两个操作数&#xff0c;并运算的结果后重新压回栈顶。 一地址指令&#xff1a; 二、三地址指令 四…

C# WPF入门学习主线篇(十六)—— Grid布局容器

C# WPF入门学习主线篇&#xff08;十六&#xff09;—— Grid布局容器 欢迎来到C# WPF入门学习系列的第十六篇。在前几篇文章中&#xff0c;我们已经探讨了 Canvas、StackPanel、WrapPanel 和 DockPanel 布局容器及其使用方法。本篇博客将介绍另一种功能强大且灵活的布局容器—…

Spring AI 第二讲 之 Chat Model API 第四节Amazon Bedrock

Amazon Bedrock是一项托管服务&#xff0c;通过统一的应用程序接口提供来自不同人工智能提供商的基础模型。 Spring AI 通过实现 Spring 接口 ChatModel、StreamingChatModel 和 EmbeddingModel&#xff0c;支持亚马逊 Bedrock 提供的所有聊天和嵌入式 AI 模型。 此外&#xf…

【Python报错】已解决TypeError: ufunc ‘isnan’ not supported for the input types

成功解决“TypeError: ufunc ‘isnan’ not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ‘‘safe’’”错误的全面指南 在使用NumPy等科学计算库时&#xff0c;我们经常会遇到各种各样…

pytorch-数据增强

目录 1. Flip翻转2. Rotate旋转3. scale缩放4. crop裁剪5. 总结6. 完整代码 1. Flip翻转 上图中做了随机水平翻转和随机垂直翻转&#xff0c;翻转完成后转化成tensor 2. Rotate旋转 上图中作了2次旋转第一次旋转角度在-15<0<15范围内&#xff0c;随机出一个角度&#xf…

创建google cloud storage notification 的权限问题

问题 根据google 的文档&#xff1a; https://cloud.google.com/storage/docs/reporting-changes#command-line 明确表示&#xff0c; 要创建storage notificaiton &#xff0c; 创建者(or service account) 只需要bucket 和 pubsub admin roles 但是实际上我在公司尝试为1个…

【AI 高效问答系统】机器阅读理解实战内容

⭐️我叫忆_恒心&#xff0c;一名喜欢书写博客的研究生&#x1f468;‍&#x1f393;。 如果觉得本文能帮到您&#xff0c;麻烦点个赞&#x1f44d;呗&#xff01; 近期会不断在专栏里进行更新讲解博客~~~ 有什么问题的小伙伴 欢迎留言提问欧&#xff0c;喜欢的小伙伴给个三连支…

【spark】spark列转行操作(json格式)

前言&#xff1a;一般我们列转行都是使用concat_ws函数或者concat函数&#xff0c;但是concat一般都是用于字符串的拼接&#xff0c;后续处理数据时并不方便。 需求&#xff1a;将两列数据按照设备id进行分组&#xff0c;每个设备有多个时间点位和对应值&#xff0c;将其一一对…

企业网页制作

随着互联网的普及&#xff0c;企业网站已成为企业展示自己形象、吸引潜在客户、开拓新市场的重要方式。而企业网页制作则是构建企业网站的基础工作&#xff0c;它的质量和效率对于企业网站的成败至关重要。 首先&#xff0c;企业网页制作需要根据企业的特点和需求进行规划。在网…

Springboot使用webupload大文件分片上传(包含前后端源码)

Springboot使用webupload大文件分片上传&#xff08;包含源码&#xff09; 1. 实现效果1.1 分片上传效果图1.2 分片上传技术介绍 2. 分片上传前端实现2.1 什么是WebUploader&#xff1f;功能特点接口说明事件APIHook 机制 2.2 前端代码实现2.2.1&#xff08;不推荐&#xff09;…

ssm汽车在线销售系统

摘 要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&#xff0c;科学化的管理&#xff0c;使信息存…

python中使用 Matplotlib 的 GridSpec 来实现更复杂的布局控制

matplotlib.gridspec 是 Matplotlib 库中的一个模块&#xff0c;用于创建复杂的子图布局。GridSpec 提供了更精细的控制&#xff0c;允许你定义不同大小和位置的子图。下面是对 GridSpec 的详细介绍和一些常见用法示例&#xff1a; 1. 基本用法 GridSpec 类似于表格布局&…

R语言数据分析16-针对芬兰污染指数的分析与考察

1. 研究背景及意义 近年来&#xff0c;随着我国科技和经济高速发展&#xff0c;人们生活质量也随之显著提高。但是&#xff0c; 环境污染问题也日趋严重&#xff0c;给人们的生活质量和社会生产的各个方面都造成了许多不 利的影响。空气污染作为环境污染主要方面&#xff0c;更…