延迟队列实现订单超时自动取消

news2025/1/19 17:07:36

在上一篇 Java 实现订单未支付超时自动取消,使用Java自带的定时任务TimeTask实现订单超时取消,但是有小伙伴提出这种实现,会有以下几个问题:

  • 线上服务挂了,导致服务下所有的定时任务失效。
  • 服务重启,定时任务也会失效。
  • 服务上线需要发布新的服务,原来服务也会关闭。

针对上述服务挂了、或者服务重启导致消息失效的问题,需要使用独立于项目的服务,比如消息中间件,比如Redis或者RabbitMQ。本文主要讲解消息队列RabbitMQ

实现效果

创建一个订单,超时30分钟未支付就取消订单。

RabbitMQ本身是不支持延迟队列的,但可以利用RabbitMQ存活时间 + 死信队列来实现消息延迟。

TTL + DLX

存活时间 TTL

TTL全称为:time to live,意思为存活时间,当消息没有配置消费者,消息就一直停留在队列中,停留时间超过存活时间后,消息会被自动删除

RabbitMQ支持两种TTL设置:

  • 对消息本身设置存活时间,每条消息的存活时间可以灵活设置为不同的存活时间。
  • 对传递的队列设置存活时间,每条传到到队列的过期时间都一致。

当消息过期还没有被消费,此时消息会变成死信消息dead letter这是实现延迟队列的关键

消息变为死信的条件:

  • 消息被拒绝basic.reject/basic.nack,并且requeue=false
  • 消息的过期时间到期了。
  • 队列达到最大长度。

死信交换机 DLX

当上面的消息变成死信消息之后,它不会立即被删除,首先它要看有没有对应的死信交换机,如果有绑定的死信交换机,消息就会从发送到对应的死信交换机上。

DLX全程为Dead Letter Exchanges,意思为死信交换机。

死信交换机和普通交换机没什么区别,不同的是死信交换机会绑定在其他队列上,当队列的消息变成死信消息后,死信消息会发送到死信交换上。

队列绑定死信交换机需要两个参数:

  • x-dead-letter-exchange: 绑定的死信交换机名称。
  • x-dead-letter-routing-key: 绑定的死信交换机routingKey

死信交换机和普通交换机的区别就是死信交换机的ExchangeroutingKey作为绑定参数,绑定在其他队列上。

项目实战

消息发送的流程图:

  • 生产者将带有TTL的消息发送给交换机,由交换机路由到队列中。
  • 队列由于没有消费,消息一直停留在队列中,一直等到消息超时,变成死信消息。
  • 死信消息转发到死信交换机在路由到死信队列上,最后给消费者消费。

创建死信队列

@Configuration
public class DelayQueueRabbitConfig {
  // 下面是死信队列
	/**
	 * 死信队列
	 */
	public static final String DLX_QUEUE = "queue.dlx";

	/**
	 * 死信交换机
	 */
	public static final String DLX_EXCHANGE = "exchange.dlx";

	/**
	 * 死信routing-key
	 */
	public static final String DLX_ROUTING_KEY = "routingKey.dlx";


	/**
	 * 死信队列
	 * @return
	 */
	@Bean
	public Queue dlxQueue() {
		return new Queue(DLX_QUEUE,true);
	}

	/**
	 * 死信交换机
	 * @return
	 */
	@Bean
	public DirectExchange dlxExchange() {
		return new DirectExchange(DLX_EXCHANGE,true,false);
	}

	/**
	 * 死信队列和交换机绑定
	 * @return
	 */
	@Bean
	public Binding bindingDLX() {
		return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
	}
}

创建延迟队列,并绑定死信队列

  // 下面的是延迟队列
	/**
	 * 订单延迟队列
	 */
	public static final String ORDER_QUEUE = "queue.order";

	/**
	 * 订单交换机
	 */
	public static final String ORDER_EXCHANGE = "exchange.order";

	/**
	 * 订单routing-key
	 */
	public static final String ORDER_ROUTING_KEY = "routingkey.order";


	/**
	 * 订单延迟队列
	 * @return
	 */
	@Bean
	public Queue orderQueue() {
		Map<String,Object> params = new HashMap<>();
		params.put("x-dead-letter-exchange", DLX_EXCHANGE);
		params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
		return new Queue(ORDER_QUEUE, true, false, false, params);
	}

	/**
	 * 订单交换机
	 * @return
	 */
	@Bean
	public DirectExchange orderExchange() {
		return new DirectExchange(ORDER_EXCHANGE,true,false);
	}

	/**
	 * 订单队列和交换机绑定
	 * @return
	 */
	@Bean
	public Binding orderBinding() {
		return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
	}

绑定死信交换通过添加x-dead-letter-exchangex-dead-letter-routing-key参数指定对应的交换机和路由。

发送消息

设置五秒超时时间

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/dlx")
    public String dlx() {
        String date = DateUtil.dateFormat(new Date());
        String delayTime = "5000";
        System.out.println("【发送消息】延迟 5 秒 发送时间 " + date);
        rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
       return "ok";         
    }
    
    class DateUtil{
       public static String dateFormat(Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        return sdf.format(date);
      }
    } 
}    

消费消息

@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)
public void delayPrecss(String msg,Channel channel,Message message){
    System.out.println("【接收消息】" + msg + " 接收时间" + DateUtil.dateFormat(new Date()));
}
    

控制台输出

【发送消息】延迟5 秒 发送时间 21:32:15
【接收消息】延迟5 秒 发送时间 21:32:15 接收时间21:32:20

发送消息,5秒之后消费者后会收到消息。说明延迟成功。

队列都有先进先出的特点,如果队列前面的消息延迟比后的消息延迟更长,会出现什么情况。

消息时序问题

发送三条消息,延迟分别是10s2s5s

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/dlx")
    public String dlx() {
         dlxSend("延迟10秒","10000");
         dlxSend("延迟2 秒","2000");
         dlxSend("延迟5 秒","5000");
         return "ok";
    }
    
    private void dlxSend(String message,String delayTime) {
         System.out.println("【发送消息】" + message +  "当前时间" + DateUtil.dateFormat(new Date()));
         rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
    }

控制台输出:

【发送消息】延迟10秒当前时间21:54:36
【发送消息】延迟2 秒当前时间21:54:36
【发送消息】延迟5 秒当前时间21:54:36
【接收消息】延迟10秒 当前时间21:54:46
【接收消息】延迟2 秒 当前时间21:54:46
【接收消息】延迟5 秒 当前时间21:54:46

所有的消息都要等10s的消息消费完才能消费,当10s消息未被消费,其他消息也会阻塞,即使消息设置了更短的延迟。因为队列有先进先出的特征,当队列有多条消息,延迟时间就没用作用了,前面的消息消费后,后的消息才能被消费,不然会被阻塞到队列中。

插件实现解决消息时序问题

针对上面消息的时序问题,RabbitMQ开发一个延迟消息的插件delayed_message_exchange,延迟消息交换机。使用该插件可以解决上面时序的问题。

在Github官网找到对应的版本,我选择的是3.8.17

将文件下载下来放到服务器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins目录下,执行以下命令,启动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动插件,交换机会有新的类型x-delayed-message:

x-delayed-message类型的交换机,支持延迟投递消息。发送消息给x-delayed-message类型的交换流程图:

  • x-delayed-message类型的交换机接收消息投递后,并未将直接路由到队列中,而是存储到mnesia(一个分布式数据系统),该系统会检测消息延迟时间。
  • 消息达到可投递时间,消息会被投递到目标队列。

配置延迟队列

@Configuration
public class XDelayedMessageConfig {
  /**
	 * 队列
	 */
	public static final String DIRECT_QUEUE = "queue.delayed";

	/**
	 * 延迟交换机
	 */
	public static final String DELAYED_EXCHANGE = "exchange.delayed";

	/**
	 * 绑定的routing key
	 */
	public static final String ROUTING_KEY = "routingKey.bind";

	@Bean
	public Queue directQueue() {
		return new Queue(DIRECT_QUEUE,true);
	}

	/**
	 * 定义延迟交换机
	 * 交换机的类型为 x-delayed-message
	 * @return
	 */
	@Bean
	public CustomExchange delayedExchange() {
		Map<String,Object> map = new HashMap<>();
		map.put("x-delayed-type","direct");
		return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
	}

	@Bean
	public Binding delayOrderBinding() {
		return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
	}

}

发送消息:

    @GetMapping("/delay")
    public String delay() {
	    delaySend("延迟队列10 秒",10000);
	    delaySend("延迟队列5 秒",5000);
	    delaySend("延迟队列2 秒",2000);
        return "ok";
    }
    
    private void delaySend(String message,Integer delayTime) {
        message = message + " " + DateUtil.dateFormat(new Date());
        System.out.println("【发送消息】" + message);
        rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setDelay(delayTime);
                    //message1.getMessageProperties().setHeader("x-delay",delayTime);
                    return message1;
                });
    }    

消费消息:

    @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)
    public void delayProcess(String msg,Channel channel, Message message) {
        System.out.println("【接收消息】" + msg + " 当前时间" + DateUtil.dateFormat(new Date()));
   }

控制台输出:

【发送消息】延迟队列10 秒 22:00:01
【发送消息】延迟队列5 秒 22:00:01
【发送消息】延迟队列2 秒 22:00:01
【接收消息】延迟队列2 秒 22:00:01 当前时间22:00:03
【接收消息】延迟队列5 秒 22:00:01 当前时间22:00:05
【接收消息】延迟队列10 秒 22:00:01 当前时间22:00:10

解决了消息的时序问题。

总结

  • 使用Java自带的延迟消息,系统重启或者挂了之后,消息就无法发送,不适于用在生产环境上。
  • RabbitMQ本身不支持延迟队列,可以使用存活时间ttl + 死信队列dlx实现消息延迟。
    • 发送的消息设置ttl,所在的队列不设置消费者。
    • 队列绑定死信队列,消息超时之后,变成死信消息,再发送给死信队列,最后发送给消费者。
  • 发送多条不同延迟时间消息,前面消息没有到延迟时间,会阻塞后面延迟更低的消息,因为队列有先进先出的特性。
  • RabbitMQx-delay-message插件可以解决消息时序问题。
    • 带有ttl的消息发送x-delayed-message类型的交换机,消息不会直接路由到队列中。而且存储到分布式数据系统中,该系统会检测消息延迟时间。
    • 消息到达延迟时间,消息才能会投递到队列中,最后发送给消费者。

Github 源码

  • Github 源码

参考

  • Time-To-Live and Expiration

  • Dead Letter Exchanges

  • 领导看了我写的关闭超时订单,让我出门左转!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386839.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

buuctf-pwn write-ups (10)

文章目录buu073-hitcontraining_bambooboxbuu074-cmcc_pwnme2buu075-picoctf_2018_got-shellbuu076-npuctf_2020_easyheapbuu077-wdb_2018_2nd_easyfmtbuu078-PicoCTF_2018_can-you-gets-mebuu079-mrctf2020_easy_equationbuu080-ACTF_2019_babystackbuu081-mrctf2020_shellcod…

C语言每日一题】——杨氏矩阵

【C语言每日一题】——倒置字符串&#x1f60e;前言&#x1f64c;杨氏矩阵&#x1f64c;总结撒花&#x1f49e;&#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右铭&#xff1a;全神贯注的上吧&#xff01;&#xff01;&#xff01; &#x1f60a;作者简介…

WebGL系列教程 — 绘制点(版本2、版本3、版本4、版本5)

目录 0.WebGL简介&#xff1a; 绘制一个点 绘制一个点&#xff08;版本2&#xff09; 绘制一个点&#xff08;版本3&#xff09; 绘制一个点&#xff08;版本4&#xff09; 实现 绘制一个点&#xff08;版本5&#xff09; 0.WebGL简介&#xff1a; WebGL是一种用于在We…

Linux学习第六节-Facl访问控制列表

1.概念 FACL&#xff08;Filesystemctl Access Control List &#xff09; 文件系统访问控制列表&#xff1a;利用文件扩展属性保存额外的访问控制权限&#xff0c;单独为每一个用户量身定制一个权限。 命令格式&#xff1a; setfacl [选项] 归属关系:用户名:权限 目录 常用选…

【数据结构起航】:衡量算法的好坏--时间空间复杂度

时间复杂度和空间复杂度 文章目录时间复杂度和空间复杂度1.算法效率1.1算法复杂度1.2复杂度在OJ里的应用2.时间复杂度2.1时间复杂度的概率2.2大O渐进表示法推导大O阶方法&#xff1a;2.3时间复杂度的举例计算3.空间复杂度3.1空间复杂度的举例计算4.复杂度各量级对比1.算法效率 …

ECharts数据可视化库--导入EChars库

目录 一.ECharts基本认识 二.EChars库导入 1.下载echart.js文件 2.在IDEA引入eharts.js 三.数据可视化绘制 最快乐的学习就是从繁入简&#xff0c;虽然此文章比较简短&#xff0c;但花最少的时间学最多的技能是真的香&#xff0c;点个关注吧&#xff01;这个是数据可视化的…

Kubernetes三 Kubernetes之Pod控制器与service详解

KubernetesPod详解 一 Pod控制器详解 1.1 Pod控制器介绍 Pod是kubernetes的最小管理单元&#xff0c;在kubernetes中&#xff0c;按照pod的创建方式可以将其分为两类&#xff1a; 自主式pod&#xff08;不推荐&#xff09;&#xff1a;kubernetes直接创建出来的Pod&#xf…

MDK Keil查看map文件及编译占用内存信息、函数入口地址

一、在哪里打开map文件查看&#xff08;1&#xff09;第一种&#xff0c;在keil软件下&#xff0c;双击你的工程名&#xff0c;map就会弹出&#xff08;2&#xff09;第二种&#xff0c;点击魔术棒&#xff0c;查看 Lis文件输出的位置入口我的在output...文件下&#xff0c;那我…

努力构建15分钟听力圈,腾讯天籁行动助力听障老人更快融入数字社会

3月3日&#xff0c;腾讯与北京听力协会联合举办线上技术研讨会&#xff0c;以“AI助听技术发展与应用创新”为主题&#xff0c;汇聚国内从事AI助听、辅听相关理论研究及前沿技术落地的专家学者&#xff0c;共同探讨当前人工智能在听力健康领域的研究热点和实践成果。会上&#…

得物供应链复杂业务实时数仓建设之路

1 背景 得物供应链业务是纷繁复杂的&#xff0c;我们既有 JIT 的现货模式中间夹着这大量的仓库作业环节&#xff0c;又有到仓的寄售&#xff0c;品牌业务&#xff0c;有非常复杂的逆向链路。在这么复杂的业务背后&#xff0c;我们需要精细化关注人货场车的效率和成本&#xff…

Malware Dev 03 - 隐匿之 Command Line Spoofing 原理解析

写在最前 如果你是信息安全爱好者&#xff0c;如果你想考一些证书来提升自己的能力&#xff0c;那么欢迎大家来我的 Discord 频道 Northern Bay。邀请链接在这里&#xff1a; https://discord.gg/9XvvuFq9Wb我拥有 OSCP&#xff0c;OSEP&#xff0c;OSWE&#xff0c;OSED&…

浅分析BIG-建筑展示系统

一、主页&#xff08;主要界面&#xff09;重点疑点&#xff08;需要解决&#xff09;1.云平台实时同步。是否可以电脑与hololens2同步或链接&#xff1f;并可以传输信息提醒&#xff1f;一级界面&#xff08;启动界面&#xff09;1.交互式启动激活效果&#xff08;触发按钮旋转…

TCP协议三次握手的原因是什么?为什么不用两次握手和4次握手?

今天复习了TCP协议的三次握手&#xff0c;对上一篇C网络编程有了更深的理解。当时考研的时候计网学过&#xff0c;这里再总结一下分享。网图都是截图来的&#xff0c;侵删。TCP协议属于传输层协议&#xff0c;上面的应用层协议包括HTTP、FTP之类&#xff0c;应用层协议是最接近…

Prometheus 监控云Mysql和自建Mysql(多实例)

本文您将了解到 Prometheus如何配置才能监控云Mysql(包括阿里云、腾讯云、华为云)和自建Mysql。 Prometheus 提供了很多种Exporter&#xff0c;用于监控第三方系统指标&#xff0c;如果没有提供也可以根据Exporter规范自定义Exporter。 本文将通过MySQL server exporter 来监控…

通达信波段主图指标公式,源码简洁原理却不简单

通达信波段主图指标公式的核心语句也就4句&#xff0c;后面的语句都是为了画图的。公式看起来比较简单&#xff0c;原理也比较巧妙&#xff0c;但是理解起来有些困难。 直接上源码&#xff1a; HH:HHV(H,5); LL:LLV(L,5); TH:BARSLAST(H>REF(HH,1)); TL:BARSLAST(L<REF(…

K8s(v1.25.1) 高可用集群(3 Master + 5 Node) Ansible 剧本部署(CRI使用docker,cri-docker)

写在前面 分享一个 k8s 高可用集群部署的 Ansible 剧本以及涉及到的一些工具的安装博文内容涉及&#xff1a;从零开始 一个 k8s 高可用 集群部署 Ansible剧本编写&#xff0c;编写后搭建 k8s 高可用 集群一些集群常用的 监控&#xff0c;备份工具安装&#xff0c;包括&#xff…

边缘计算:万字长文详解高通SNPE inception_v3推理实战

本文主要讲解利用高通SNPE进行神经网络推理&#xff0c;主要参考&#xff1a; 上手SNPE&#xff0d;推理inception_v3 - 知乎 文中是容器做的&#xff0c;在conda环境下做一样的&#xff0c;没问题&#xff0c;已跑通。 在anaconda环境中使用conda命令安装cuda、cudnn、tens…

数据结构与算法系列之单链表

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; 这里写目录标题test.hSList.h注意事项一级指针与二级指针的使用assert的使用空…

内大892复试真题19年

内大892复试真题19年 1. 统计低于平均分的人数2. 输出数组中最大值3. 一元二次方程求根4. 字符串数组平移(反转法)5. 矩阵乘法(分治+strassen思想)1. 统计低于平均分的人数 问题 代码 #include <iostream>using namespace std;// 函数声明 double avgFunc

0098 Mysql01

1.登录Mysql mysql -uroot -p密码 2.Mysql常用命令 退出:exit 查看mysql有哪些数据库&#xff1a;show databases;(以分号结尾) 选择使用某个数据库&#xff1a;use sys; (表示正在使用一个名叫sys得数据库) 创建数据库&#xff1a;create database bjpowernode; 查看某个数…