SpringBoot 整合 RabbitMQ 实现延迟消息

news2025/1/12 22:45:31

一、业务场景说明

用于解决用户下单以后,订单超时如何取消订单的问题。

  • 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  • 生成订单,获取订单的id;
  • 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  • 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  • 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

相关概念:(死信队列与延迟队列)

        死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列

        死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。

死信队列(Dead Letter Queue, DLQ)

目的: 处理无法成功消费的消息。

特点:

  1. 目的: 用于接收无法成功处理的消息。这些消息可能因为多种原因无法被正常消费,例如消息处理失败、消息超时、队列满等。
  2. 触发条件: 当消息在主队列中无法被正常消费,并且达到一定的失败次数或过期时间后,会被转发到死信队列。
  3. 用途: 通过分析死信队列中的消息,开发者可以排查和解决系统中的问题,也可以进行后续的审计和处理。

实现方式:

  • 需要配置主队列和死信队列之间的关系。
  • 配置消息的重试策略和死信转发条件(如重试次数、过期时间等)。

延迟队列(Delayed Queue)

目的: 控制消息的处理时间。

特点:

  1. 目的: 在特定的时间点或延迟一段时间后再将消息发送到主队列进行处理。用于在系统中实现消息的延迟处理。
  2. 触发条件: 当消息被发送到延迟队列时,它会根据设定的延迟时间在未来某个时刻被转发到主队列。
  3. 用途: 常用于实现任务的延迟执行、定时任务、超时重试等功能。

实现方式:

  • 使用专门支持延迟队列的消息队列系统,或者通过设置消息的过期时间和重试机制来实现。
  • 可以配置延迟时间,确保消息在设定的时间后进入主队列。

总结

  • 死信队列: 处理无法消费的消息,通常用于异常处理和问题排查。
  • 延迟队列: 控制消息的处理时间,允许消息在未来某个时间点才被处理。

二、整合RabbitMQ实现延迟消息

整合依赖及配置

●在pom.xml中添加AMQP相关依赖;

<!--Spring AMQP相关依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 修改application.yml文件,在spring节点下添加RabbitMQ相关配置。
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /mall
    username: mall
    password: mall
    publisher-returns: true #消息发送到队列确认
    publisher-confirm-type: simple #消息发送到交换器确认

实现延迟消息

  • 添加消息队列的枚举配置类QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称;
/**
 * @description 消息队列枚举配置
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知队列
     */
    QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
    /**
     * 消息通知ttl队列
     */
    QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

    /**
     * 交换机名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}
  • 添加RabbitMQ的Java配置,用于配置交换机、队列及队列与交换机的绑定关系;
/**
 * @description 消息队列配置
 */
@Configuration
public class RabbitMqConfig {

  /**
   * 订单消息实际消费队列所绑定的交换机
   */
  @Bean
  DirectExchange orderDirect() {
    return ExchangeBuilder
            .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
            .durable(true)
            .build();
  }

  /**
   * 订单延迟队列所绑定的交换机
   */
  @Bean
  DirectExchange orderTtlDirect() {
    return ExchangeBuilder
            .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
            .durable(true)
            .build();
  }

  /**
   * 订单实际消费队列
   */
  @Bean
  public Queue orderQueue() {
    return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
  }

  /**
   * 订单延迟队列(死信队列)
   */
  @Bean
  public Queue orderTtlQueue() {
    return QueueBuilder
            .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
            .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
            .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
            .build();
  }

  /**
   * 将订单队列绑定到交换机
   */
  @Bean
  Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
    return BindingBuilder
            .bind(orderQueue)
            .to(orderDirect)
            .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
  }

  /**
   * 将订单延迟队列绑定到交换机
   */
  @Bean
  Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
    return BindingBuilder
            .bind(orderTtlQueue)
            .to(orderTtlDirect)
            .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
  }

}
  • 此时登录,在RabbitMQ管理页面可以看到以下交换机和队列;

  • 交换机及队列说明如下:mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。
  • 添加延迟消息的发送者CancelOrderSender,用于向订单延迟消息队(mall.order.cancel.ttl)里发送消息;
/**
 * @description 取消订单消息的发出者
 */
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}
  • 添加取消订单消息的接收者CancelOrderReceiver,用于从取消订单的消息队列(mall.order.cancel)里接收消息;
/**
 * @description 取消订单消息的处理者
 */
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}
  • 添加OmsPortalOrderService接口;
/**
 * @description 前台订单管理Service
 */
public interface OmsPortalOrderService {

    /**
     * 根据提交信息生成订单
     */
    @Transactional
    CommonResult generateOrder(OrderParam orderParam);

    /**
     * 取消单个超时订单
     */
    @Transactional
    void cancelOrder(Long orderId);
}
  • 添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl;
/**
 * @description 前台订单管理Service
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
    private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 执行一系类下单操作,具体参考mall项目
        LOGGER.info("process generateOrder");
        //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
        sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下单成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 执行一系类取消订单操作,具体参考mall项目
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
        //获取订单超时时间,假设为60分钟
        long delayTimes = 30 * 1000;
        //发送延迟消息
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }

}
  • 添加OmsPortalOrderController定义接口;
/**
 * @description 订单管理Controller
 */
@Controller
@Api(tags = "OmsPortalOrderController")
@Tag(name = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
    @Autowired
    private OmsPortalOrderService portalOrderService;

    @ApiOperation("根据购物车信息生成订单")
    @RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
    @ResponseBody
    public Object generateOrder(@RequestBody OrderParam orderParam) {
        return portalOrderService.generateOrder(orderParam);
    }
}

延迟消息功能演示

  • 运行项目,访问Swagger API文档,访问地址:http://localhost:8080/swagger-ui/

可以看到,经过了30秒后延迟消息发送出去了 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2035217.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python语言day5 MD5 json

md5&#xff1a; python提供了内置的md5加密功能&#xff0c;使用md5模拟一个小项目&#xff1a; 注册&#xff1a; 启动py程序&#xff0c;在控制台界面提示用户输入用户名及密码&#xff1b; 使用md5加密 密码&#xff1b; 创建txt文件记录输入的用户名 和密文。 登录&…

访问网站显示不安全打不开怎么办如何处理

当访问网站时浏览器提示“不安全”&#xff0c;这通常是由于多种原因造成的。下面是一些常见的原因及其解决办法&#xff1a; 未启用HTTPS协议 如果网站仅使用HTTP协议&#xff0c;数据传输没有加密&#xff0c;会被浏览器标记为“不安全”。解决办法是启用HTTPS协议&#xff…

vue3-02-vue3中的组件通信

目录 组件通信一、vue3组件通信和vue2的区别二、父子通信2.1 props通信1&#xff09;父→子传递数据&#xff08;父组件向子组件传递数据&#xff09;2&#xff09;子→父传递数据 2.2 v-model1&#xff09;v-model的本质2&#xff09;给modelValue起别名3&#xff09;$event 2…

【资源】wordpress 子比主题

简介 子比主题是一款功能强大的WordPress主题模板&#xff0c;支持社区论坛、商城、支付、古腾堡编辑器等多种功能。很多资源类网站都是基于此搭建的。搭建后的效果基本上和官网一致&#xff0c;可查看官网的演示效果。 官方网站&#xff1a;https://www.zibll.com/ 如要获取…

精酿啤酒的酿造过程 你喝的不仅仅是酒

大家下午好呀&#xff01;今天我要带大家一起探索精酿啤酒的神秘世界&#xff01;&#x1f31f;&#x1f31f; 第一步&#xff1a;原料准备 精酿啤酒的四大原料&#xff1a;麦芽、啤酒花、水和酵母。 别小看这些原料&#xff0c;它们的品质直接决定了啤酒的风味。&#x1f33e;…

网络协议七 应用层 HTTP 协议

应用层常见的协议 HTTP协议 一. 如何查看我们的http 协议全部的内容有哪些呢&#xff1f; 一种合理的方法是 通过 wireshark 软件&#xff0c;找到想要查看的HTTP --->追踪流--->HTTP流 来查看 结果如下&#xff1a;红色部分 为 发送给服务器的&#xff0c;蓝色部分为服…

【Qt开发】QtCharts图表——在ui上添加QChartView控件并进行绘图配置

【Qt开发】QtCharts图表——在ui上添加QChartView控件并进行绘图配置 文章目录 控件安装和模块导入在ui上添加QChartView控件QChartView图表配置附录&#xff1a;C语言到C的入门知识点&#xff08;主要适用于C语言精通到Qt的C开发入门&#xff09;C语言与C的不同C中写C语言代码…

Stable Diffusion 必备插件推荐,菜鸟轻松成高手!

前言 一个刚学AI绘画的小菜鸟如何快速成为Stable Diffusion高手&#xff1f;答案就是SD插件。 只要学会使用SD的各种插件&#xff0c;帮你写正向和负向提示词&#xff0c;修复人脸/身体/手指&#xff0c;高清放大图片&#xff0c;指定人物pose&#xff0c;图片微调等等都可以…

YOLO系列算法解析

一、深度学习算法概述 1、不同阶段算法优缺点分析 One-stage: 优点&#xff1a;速度非常快&#xff0c;适合做实时监测任务 缺点&#xff1a;效果通常不好 2、yolo评价指标 yolo评价指标&#xff1a;map和fps Map指标&#xff1a;综合衡量检测效果 精度&#xff1a;识别准确率…

代码随想录 day 37 动态规划

第九章 动态规划 part05 力扣上没有纯粹的完全背包的题目&#xff0c;我在卡码网上制作了题目&#xff0c;大家可以去做一做&#xff0c;题目链接在下面的文章链接里。 后面的两道题目&#xff0c;都是完全背包的应用&#xff0c;做做感受一下 完全背包 视频讲解&#xff1a…

这些错误都没遇到过,还敢说你做过自动化测试?!

在执行冒烟测试、回归测试或多浏览器兼容性测试时&#xff0c;利用web自动化测试可以显著节省人力成本&#xff0c;因此web自动化测试的价值非常大。然而&#xff0c;任何从事过web自动化测试的人都会有这样的体会:写自动化代码相对简单&#xff0c;但维护的成本却非常高。一日…

若依服务器上云部署

准备条件&#xff1a; 安装好mysql和redis并配置好密码。 1.安装JDK&#xff0c;我这里使用的是1.8 wget --no-check-certificate --no-cookies --header "Cookie: oraclelicenseaccept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u131-b11…

40 - asctime()函数

文章目录 1 函数原型2 参数3 返回值4 示例4.1 示例14.2 示例2 1 函数原型 asctime()&#xff1a;时间类型转换&#xff0c;函数原型如下&#xff1a; char* asctime (const struct tm * timeptr);ctime()库描述如下&#xff1a; Convert tm structure to string 1. Interpre…

MySQL4 多表查询 内连接

内连接 多表查询内连接 多表查询 数据准备 CREATE DATABASE db4; USE db4; -- 创建部门表 create table if not exists dept(deptno varchar(20) primary key , -- 部门号name varchar(20) -- 部门名字 );-- 创建员工表 create table if not exists emp(eid varchar(20) pr…

【文件IO】文件系统操作

文章目录 基本操作概述1. 文件属性2. 文件构造方法3. 文件方法1. 文件创建2. 文件删除3. 查看目录下所有的文件名4. 遍历目录5. 创建目录5. 目录重命名 基本操作概述 创建文件删除文件创建目录重命名文件判定文件存在… Java 中&#xff0c;提供了一个 File 类&#xff0c;进…

电商平台的推荐算法需要备案吗?

答案是肯定的&#xff01; 政策要求&#xff1a; 根据我国《互联网信息服务算法推荐管理规定》&#xff08;以下简称《规定》&#xff09;第六条&#xff0c;具有舆论属性或社会动员能力的互联网信息服务&#xff0c;包括电商平台的推荐算法&#xff0c;需要进行备案。 电商平…

使用js和css 实现div旋转围绕圆分布排列

记录&#xff0c;以防忘记 围绕圆 import React, { useEffect } from react; import ./index.scoped.scss;const Test () > {const arr Array.from({ length: 28 }, (_, index) > index 1);useEffect(() > {const dayTotal arr.length;// 动态设置每个点的旋转角…

快速找出问题快件:批量查询与筛选技巧

在日常生活中&#xff0c;我们经常需要查询大量的快递信息。尤其在电商、物流等行业&#xff0c;快速、准确地查询和筛选快递信息至关重要。固乔快递查询助手是一款强大的工具&#xff0c;能帮助用户批量查询快递&#xff0c;并快速筛选出问题快件。下面我们将详细介绍如何使用…

opengl创建柱面和鱼眼重展uv

专业软件 先看一下专业软件 可以拉取很多的uv点 以下是使用 OpenGL 创建不规则面片并指定 UV 的一般步骤&#xff1a; 1 顶点数据准备 2 定义面片的顶点坐标。这些顶点构成了面片的形状。 3 为每个顶点指定对应的纹理坐标&#xff08;UV&#xff09;。 4 创建顶点缓冲区对象…

Springboot整合hutool验证码

在 Spring Boot 中&#xff0c;你可以将 Hutool 生成验证码的功能集成到 RESTful API 接口中。 依赖 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.14</version> <!-- 使用最新版…