RabbitMQ——延迟队列

news2025/1/12 3:45:17

目录

一、延迟队列的应用场景

1. 场景:"订单下单成功后,15分钟未支付自动取消"

① 传统处理超时订单

② RabbitMQ延时队列方案

二、延迟队列中的消息投递和消息消费

1.TTL 和 DLX

 ① TTL

② DLX和死信队列 

③ 延迟队列 

④ 开发步骤 

⑤ json转换 


一、延迟队列的应用场景

1. 场景:"订单下单成功后,15分钟未支付自动取消"

① 传统处理超时订单

采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
     并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
     即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
     然后再做其他的业务操作 

② RabbitMQ延时队列方案

 一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
     并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

二、延迟队列中的消息投递和消息消费

1.TTL 和 DLX

  rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列

 ① TTL

TTL是Time To Live的缩写, 也就是生存时间。
     RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
     如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
     默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

     设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
     设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

     消息:生产者 -> 交换机 消息在生产者制造消息的时候就开始计算了TTL  TTL=5
     队列:生产者 -> 交换机 -> 路由键 -> 队列 当消息送达到队列的时候才开始计算TTL  TTL=10

② DLX和死信队列 

 DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

     死信队列是指队列(正常)上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上,
     这个队列,就是死信队列

     成为死信一般有以下几种情况:
     消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
     消息的TTL-存活时间已经过期
     队列长度限制被超越(队列满)

死信队列产生流程:

     
     注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
     注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
          x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键

③ 延迟队列 

通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

   
   注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”  

④ 开发步骤 

 1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数

     关键代码1
     new Queue(name, durable, exclusive, autoDelete, arguments);
     new Queue(NORMAL_QUEUE, true, false, false, map)

参数说明:

 name:队列名字
     durable:true则持久队列
     exclusive:如果我们声明一个排他队列(该队列将仅由声明者的连接使用),则为true
     autoDelete:服务器不再使用时应删除队列,则为true
     arguments:用于声明队列的参数
       map.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒
       map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
       map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键 

  关键代码2
     new DirectExchange(NORMAL_EXCHANGE, true, false);

   2.消费者A
     正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期

   3.消费者B
     消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中

打开我们的虚拟机和连接工具,还有IDEA,确保我们的环境没有问题。

RabbitmqDLXConfig.java 

package com.jwj.rabbitmqprovider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * @author 敢敢
 * @site www.javajwj.com
 * @company xxx公司
 * @create  2022-12-16 21:02
 */
@Configuration
public class RabbitmqDLXConfig {

//    Ctrl+Shift+x:转换为大写

    public static final String NORMAL_QUEUE="normal_queue";
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static final String NORMAL_ROUTING_KEY="normal_routing_key";

    public static final String DLX_QUEUE="dlx_queue";
    public static final String DLX_EXCHANGE="dlx_exchange";
    public static final String DLX_ROUTING_KEY="dlx_routing_key";

//    普通的交换机及队列
    @Bean
    public Queue normalQueue(){
//        在正常队列中,要添加参数,2:25发送的消息,要在2:40发送到死信交换机中
        Map map = new HashMap();
//        map.put("x-message-ttl", 1000*60*15);//message在该队列queue的存活时间最大为10秒
        map.put("x-message-ttl", 10000);
        map.put("x-dead-letter-exchange", DLX_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
        return new Queue(NORMAL_QUEUE,true,false,false,map);
    }
    @Bean
    public DirectExchange normalDirectExchange(){
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue())
                .to(normalDirectExchange())
                .with(NORMAL_ROUTING_KEY);
    }

//    死信交换机及队列
    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE);
    }

    @Bean
    public DirectExchange dlxDirectExchange(){
        return new DirectExchange(DLX_EXCHANGE);
    }

    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue())
                .to(normalDirectExchange())
                .with(DLX_ROUTING_KEY);
    }
}

 SendMessageController.java

package com.jwj.rabbitmqprovider.controller;

import com.jwj.rabbitmqprovider.config.RabbitmqDLXConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
 * @author 敢敢
 * @site www.javajwj.com
 * @company xxx公司
 * @create  2022-12-16 21:47
 */
@RestController
public class SendMessageController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendDirect")
    public Map sendDirect(String routingKey){
        Map msg=new HashMap();
        msg.put("msg","直连交换机 jwj-direct-Exchange 发送的消息");
        msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh-mm-ss")));

        rabbitTemplate.convertAndSend("jwj-direct-Exchange",
                routingKey,
                msg);

        Map res=new HashMap();
        res.put("code",200);
        res.put("msg","成功");
        return res;

    }

    @RequestMapping("/sendTopic")
    public Map sendTopic(String routingKey){
        Map msg=new HashMap();
        msg.put("msg","主题交换机 jwj-topic-Exchange 发送的消息");
        msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh-mm-ss")));

        rabbitTemplate.convertAndSend("jwj-topic-Exchange",
                routingKey,
                msg);

        Map res=new HashMap();
        res.put("code",200);
        res.put("msg","成功");
        return res;

    }

    @RequestMapping("/sendFanout")
    public Map sendFanout(){
        Map msg=new HashMap();
        msg.put("msg","扇形交换机 jwj-fanout-Exchange 发送的消息");
        msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh-mm-ss")));

        rabbitTemplate.convertAndSend("jwj-fanout-Exchange",
                null,
                msg);

        Map res=new HashMap();
        res.put("code",200);
        res.put("msg","成功");
        return res;
    }

    @RequestMapping("/sendDLX")
    public Map sendDLX(){
        Map msg=new HashMap();
        msg.put("msg","死信交换机 jwj-fanout-Exchange 发送的消息");
        msg.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh-mm-ss")));

        rabbitTemplate.convertAndSend(RabbitmqDLXConfig.NORMAL_EXCHANGE,
                RabbitmqDLXConfig.NORMAL_ROUTING_KEY,
                msg);
        Map res=new HashMap();
        res.put("code",200);
        res.put("msg","成功");
        return res;
    }
}

运行效果如图所示:

DLXReceiver.java 

package com.jwj.rabbitmqconsumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = {"dlx_queue"})
public class DLXReceiver {
	// @RabbitListener(queues = {"direct-queue"})
	@RabbitHandler
	public void handler(Map msg){
//		"修改订单的状态的业务逻辑写在这"
		System.out.println("死信队列中接受到的消息:"+msg);
	}
}

运行结果如下所示: 

⑤ json转换 

 生产者

@Bean
     public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());//指定json转换器
        return rabbitTemplate;
     }

     @Bean
     public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
     }

消费者:

@Bean
     public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jackson2JsonMessageConverter());
        return factory;
     }

      @Bean
     public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
     }

附录一:英文
delay:延迟的
normal:正常
exchange、route、queue
Receiver

⑥ 回顾 

应用场景:支付下单的场景

买商品下单,会有待支付到已支付,或者待支付已取消的过程,该过程会有一个时间间隔15min

难点:数据库中有大量的订单处于待支付状态,每一笔订单的过期时间是不一样的,什么时候检查呢?

传统方案:轮询------>缺陷性能极差,对硬件设备要求极高

TTL:Time to live

DLX:dead letterexchange死信交换机

14:11投递消息,最终会在14:26路由到死信交换机,再路由死信队列中

代码层面:

在实例化queue的时候,绑定参数,绑定map集合

map集合中设置ttl时间,以及绑定的dlx exchange名称,以及dlx的routing key

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/112487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring mvc 通过异常封装 验证 方法

正常情况 我们先演示一下正常情况下我们验证的方法。 首先定义一个LoginBean Data public class LoginBean {// Blank 不允许保存空格,空格不算内容NotBlank(message "用户名不能为空")String username;// Empty 允许保留空格,是空格也算内容…

【Spring】核心部分之AOP:通过列举代码例子,从底层刨析,深入源码,轻轻松松理解Spring的核心AOP,AOP有这一篇足以

AOP基本概念基本原理专业术语案例演示基于注解(重点)基于配置文件基本概念 面向切面编程,也叫面向方面编程,利用aop可以对业务逻辑的各个部分进行隔离,从而使得业务逻辑各个部分之间降低耦合,提高程序的可…

AQS 对资源的共享方式

AQS 定义两种资源共享方式 1) Exclusive(独占) 只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍: 下面来看 ReentrantLock 中相关的…

概要设计说明书(GB8567——88)基于协同的在线表格forture-sheet

概要设计说明书 1引言 1.1编写目的 为了帮助用户更好的了解和使用本在线表格,提高用户与软件的亲和度。 用户手册描述配置和使用改在线表格,以及该软件使用过程中应该注意的一下问题。 1.2背景 说明: 本用户手册所描述的软件系统的名称…

医疗检测数据存储管理系统

摘要 医疗信息化的迅速发展导致了医疗数据的指数型增长,医疗检测数据存储管理系统给医院现有信息系统带了巨大的压力。一方面,随着各种非结构化数据的不断涌现,现有的医疗信息系统在存储空间,存储速度、存储结构上达不到医疗检测数据的要求,不…

Github惊现神作,这份算法宝典让你横扫各大厂算法面试题

时间飞逝,转眼间毕业七年多,从事 Java 开发也六年了。我在想,也是时候将自己的 Java 整理成一套体系。 这一次的知识体系面试题涉及到 Java 知识部分、性能优化、微服务、并发编程、开源框架、分布式等多个方面的知识点。 写这一套 Java 面试…

使用Docker搭建Nacos的持久化和集群部署

1. 准备 1.1 mysql安装 下载镜像 docker pull mysql/mysql-server:5.7 在宿主机中相关目录,用于挂载容器的相关数据 mkdir -p /data/mysql/{conf,data} 编写my.cnf配置文件,在/data/mysql/conf目录中 (或下载 直接上传即可) my.cnf.txt - 蓝奏云 / …

【考研加油】所有上岸的考研人都有一个共同的特点,就是他们都参加考试了。2023考研加油。

声明:为 2023考研的朋友加油! 2023考研加油 今明两天,将是大部分2023考研人,真正“上战场”的时候。 我想,只有经历过的人,才能对这一历程,感同身受吧! 为你们加油! 以下是在QQ空间看到的一组图,与你们共勉。 距考研还有____天! 确定目标院校中…跨考又能如何?…

阿里人在Github分享的Spring Cloud全栈笔记,你想象不到有多全

微服务到底是什么 微服务到底是什么,一直众说纷纭,我们只知道各大企业纷纷追捧和实践微服务架构,有的项目可能使用了Spring Cloud就算是使用微服务了,然后说微服务就是Spring Cloud,有的系统可能越做越像SOA&#xff…

RV1126笔记十六:吸烟行为检测及部署<四>

若该文为原创文章,转载请注明原文出处。 转换成onnx模型(windows) 一、查看pt文件 准备好训练好的pt文件,可以用Netron打开看看大概长啥样: 二、模型转换 主要的目的是想把训练好的pt文件转成onnx模型,为后面RV1126的部署做铺垫。 我们是在py38的con…

java之多线程的三种不同创建方式and通过多线程模拟龟兔赛跑

Process与Thread: 程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念,而进程则是执行程序的一次执行过程,它是一个动态的概念,是系统资源分配的单位,通常在一个进程中可以…

视频素材网,视频剪辑必备。

视频剪辑没素材,推荐6个网站帮你解决,免费可商用,建议收藏! 1、菜鸟图库 视频素材下载_mp4视频大全 - 菜鸟图库 网站有超多视频素材,全部都是高清无水印,各种类型都有,像自然、城市、动物、科技…

自动控制原理笔记-线性系统的时域分析与校正

目录 时域法的概述: 时域法的作用和特点: 时域法常用的四个时间信号: 线性系统时域性能指标: 五个常用的性能指标: 一阶系统的时间响应及动态性能: 一阶系统动态指标的计算: 一阶系统的典型…

Github一夜爆火的阿里高并发技术小册究竟有什么魅力

阿里在农历2021到来之前却是又搞了一个大动作!把阿里这一年在应对高并发流量的技术经验整合成一份技术小册开源分享供大家学习借鉴。我也是昨天才发现这份小册开源至Github上居然一夜爆火! 看了小册之后才知道,原来阿里在应对高并发大流量时也…

python中的json数据和pyecharts模块入门

目录 一json数据格式 1.什么是json 2.json有什么用 3.json格式数据转化 4.python数据和json数据的相互转化 5.json总结 二.pyecharts模块入门 1.基础折线图 全局配置选项——set_global_opts方法 一json数据格式 1.什么是json JSON是一种轻量级的数据交互格式。可以按…

RabbitMQ 第一天 基础 3 RabbitMQ 快速入门 3.2 入门程序【消费者】

RabbitMQ 【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】 文章目录RabbitMQ第一天 基础3 RabbitMQ 快速入门3.2 入门程序3.2.1 消费者3.2.2 小结第一天 基础 3 RabbitMQ 快速入门 3.2 入门程序 3.2.1 消费者 之前我们 已经完成了生产者的基本代码编…

客快物流大数据项目(九十八):ClickHouse的SQL函数

文章目录 ClickHouse的SQL函数 一、​​​​​​​​​​​​​​类型检测函数

Verilog刷题HDLBits——Exams/review2015 fancytimer

Verilog刷题HDLBits——Exams/review2015 fancytimer题目描述代码结果题目描述 This is the fifth component in a series of five exercises that builds a complex counter out of several smaller circuits. You may wish to do the four previous exercises first (counte…

gateway中的限流与熔断

目录 1. 限流的使用场景 2. gateway限流实现 2.1 前提: 2.2 导入依赖包 2.3 在项目配置文件中配置redis 2.4 开发限流需要的Bean 2.5 为服务配置限流参数 2.6 压力测试 3. 熔断 3.1 熔断的使用场景 3.2 熔断配置 1. 限流的使用场景 为什么限流 限流就是限…

【点云检测】OpenPCDet 教程系列 [1] 安装 与 ROS运行

前言 主要是介绍库的使用,做笔记区 首先搜索的时候有个问题 一直在我脑子里 hhh 就是MMlab其实还有一个叫mmdetection3d 的库,然后搜的时候发现 hhh 有网友和我一样的疑惑: OpenPCDet和mmdetection3d有什么区别 ? - 知乎 (zhihu.com) 这…