RabbitMQ延迟列队的使用

news2024/9/22 1:40:10

目录

1. 延迟队列使用场景

2. RabbitMQ中的延迟队列实现思路

3. 实现示例

3。运行项目测试


1. 延迟队列使用场景

延迟队列一般可用于具有时间限制的任务,例如:限时优惠,超时的订单处理等。

对于这种场景,传统的处理方式是任务轮询:通过一个后台任务不断的扫描订单信息,发现有超时订单则进行处理,这种处理方式的优点是实现思路简单,容易把握,缺点是对服务器及数据的压力比较大(因为通常需要扫描大量的数据)。

处理这种场景的第二种方式就是通过延迟队列。消息生产者生成消息并放入队列后,要经过指定的延时时间后消息的消费者才能消费消息。

2. RabbitMQ中的延迟队列实现思路

在RabbitMQ中并没有直接支持延迟队列,没有对应的属性可以设置,在RabbitMQ中实现延迟队列的基本思路是:通过死信队列(DXL)和过期时间(TTL)来实现延迟队列。

即:给队列设置一个过期时间并指定一个死信交换机与其关联,消息生产者的消息发送给队列,但不指定消息消费者,等待消息过期,消息过期后会被转发到相关联的死信队列中,而消息消费者则从死信队列中消费消息。

3. 实现示例

总体思路:

  1. 声明死信交换机,队列, 并将队列绑定到死信交换机。
  2. 声明发送消息的交换机,队列(按照业务需求设置队列的过期时间,但该队列不需要消息消费者),并将队列与交换机关联。
  3. 编写业务代码通过第2步创建的交换机发送消息到队列。( 观察消息过期后将过期的消息转存到死信队列中)
  4. 编写消息消费者,消费死信队列中的消息。(在实际项目中该消息消费者就是延迟任务的处理程序)

具体步骤

准备工作

首先准配RabbitMQ服务器 具体步骤

有道云笔记

springboot 版本2.7.7

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

application.properties 配置文件


server.port=8081
## rabbitmq config
spring.rabbitmq.host=192.168.164.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=xhz
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=my_vhost
## 消费者数量
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列中获取的消息数量
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

1)声明死信交换机,队列, 并将队列绑定到死信交换机。

在@Configuration类中进行交换机(如:RabbitMQConfig),队列的声明,及绑定操作。

package com.rabbitmq.provider.rabbitmqprovider.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitDLXConfig {
    public final static String NORMAL_QUEUE="normal_queue";
    public final static String NORMAL_ROUTING_KEY = "normal_routing_key";
    public final static String NORMAL_EXCHANGE = "normal_exchange";

    public final static String DELAY_QUEUE = "delay_queue";
    public final static String DELAY_ROUTING_KEY = "delay_routing_key";
    public final static String DELAY_EXCHANGE = "delay_exchange";

    //写法一
    //    普通交换机以及普通队列
    @Bean
    public Queue normalQueue(){
        Map map = new HashMap();
        map.put("x-message-ttl", 20000);//message在该队列queue的存活时间最大为10秒
        map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键

        return new Queue(NORMAL_QUEUE, true, false, false, map);
    }
  //写法二
    /**
     * 声明队列,用于实现延迟队列,该队列指定超时时间
     * @return
     */
 /*   @Bean(name="normalQueue")
    public Queue normalQueue() {
        return QueueBuilder
                .durable(NORMAL_QUEUE)
                .withArgument("x-message-ttl", 1000*60*1)
                .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE)
                .withArgument("x-dead-letter-routing-key",DELAY_ROUTING_KEY)
                .build();
    }*/

    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXCHANGE, true, false);
    }

    @Bean
    public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange,
                                 @Qualifier("normalQueue") Queue queue){
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with(NORMAL_ROUTING_KEY);
    }

    //    死信交换机及延迟队列
    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE);
    }

    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE);
    }

    @Bean
    public Binding delayBinding(){
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with(DELAY_ROUTING_KEY);
    }

}

2)编写发送消息的程序代码

package com.rabbitmq.provider.rabbitmqprovider.web;

import com.rabbitmq.provider.rabbitmqprovider.config.RabbitDLXConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@RestController
public class SenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
  
    @RequestMapping("sendDLX")
    public Map sendDLX(){
        Map msg = new HashMap();
        msg.put("msg","这是通过死信交换机投递的消息");
        msg.put("now", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend(RabbitDLXConfig.NORMAL_EXCHANGE,RabbitDLXConfig.NORMAL_ROUTING_KEY,msg);

        Map res = new HashMap();
        res.put("msg","投递成功");
        res.put("code",200);
        return res;
    }

}

3。运行项目测试

访问地址:http://localhost:8081/sendDirect

验证:发送的消息会先发到“delay.queue”队列中,在消息过期后,会将消息发送到“delay.dxl.queue”(死信队列)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/132920.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ceph: ceph基础知识

ceph基础知识 一、基础概念 ceph官方文档 http://docs.ceph.org.cn/ ceph中文开源社区 http://ceph.org.cn/ 1、概述 Ceph是可靠的、可扩展的、统一的、开源分布式的存储系统。 Ceph是一个统一的分布式存储系统&#xff0c;设计初衷是提供较好的性能、可靠性和可扩展性。 C…

Python基础知识(一)

目录 输入输出函数 输入函数&#xff1a;input() 输出函数&#xff1a;print() 算术运算符 关系运算符 逻辑运算符 变量 1.命名规则 2.变量类型 3.动态类型特性 输入输出函数 输入函数&#xff1a;input() name input("请输入&#xff1a;") print(nam…

第二证券|北向资金全年净买入约900亿元 哪些行业和个股成“香饽饽”

2022年A股收官。回顾这一年&#xff0c;面临复杂严峻的国内外环境&#xff0c;A股商场推动完善多元融资支撑机制&#xff0c;加大了对实体经济的金融支撑力度&#xff0c;为中国经济V形复苏做出了奉献。这一年&#xff0c;A股IPO融资规划创出历史新高&#xff0c;存量上市公司打…

驱动的并发和竞争

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、什么是并发&#xff1f;并发并行并发并行模式二、什么是竞争三、如何解决竞争1、原子操作整形原子操作&#xff1a;原子位操作2.自旋锁3.信号量4.互斥锁5.如…

mysql批量更新方法

mysql批量更新方法 实验mysql版本为5.7.20 隔离级别为rr&#xff0c;加锁场景的问题在mysql8.0.18中为复现 方法一 replace into 批量更新 原理&#xff1a;replace into table (col1,col2) values (x1,x2), 操作本质是对重复的记录先delete 后insert 缺点&#xff1a;1、如…

特斯拉Model S及Model X 2023上半年交付,1月6日公布售价

特斯拉Model S及Model X终于快要交付了。 2022年12月30日&#xff0c;广州国际车展盛大开幕。众多车企带来了旗舰车型&#xff0c;让观众直呼太过瘾&#xff0c;其中&#xff0c;人流量爆火的莫过于特斯拉展台。此次&#xff0c;特斯拉携旗下S3XY家族重磅出击&#xff0c;全新车…

【C++基础】08:模板

模板 OVERVIEW模板一、函数模板1.func template基本使用&#xff1a;2.func template案例&#xff1a;数组排序3.函数与函数模板的区别&调用规则&#xff1a;4.func template的局限性&#xff1a;二、类模板1.类模板基本使用&#xff1a;2.类模板与函数模板的区别&#xff…

AOA估计中的MUSIC算法(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 随着阵列信号处理技术的不断发展,到达角估计(Angle Of Arrival)的研究在移动通信系统中具有重要意义。通过分析经典MUSIC算法,…

golang 自定义命令行flag包简单使用

一、为什么需要使用golang自定义命令行 不恰当的比喻&#xff0c;当我们写了一个服务代码后&#xff0c;按照简单的思维&#xff0c;我们会在业务代码中将要连接的数据库 用户名、主机名、端口号、密码写死。 那么也就意味着我们启动该服务后都只能固定连接某一个数据库&#x…

etcd快速入门

etcd是什么 etcd是CoreOS团队于2013年6月发起的开源项目&#xff0c;它的目标是构建一个高可用的分布式键值(key-value)数据库。 etcd内部采用raft协议作为一致性算法&#xff0c;etcd基于Go语言实现。 etcd作为服务发现系统&#xff0c;有以下的特点&#xff1a; 1.简单&#…

分享67个PHP源码,总有一款适合您

链接&#xff1a;https://pan.baidu.com/s/1MzKN0bLDRv0i290R2erMHQ?pwdbo2i 提取码&#xff1a;bo2i PHP源码 分享67个PHP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c;大家下载…

寒假每日一题W1D3——上课睡觉

题目描述 有 N 堆石子&#xff0c;每堆的石子数量分别为 a1,a2,…,aN。 你可以对石子堆进行合并操作&#xff0c;将两个相邻的石子堆合并为一个石子堆&#xff0c;例如&#xff0c;如果 a[1,2,3,4,5]&#xff0c;合并第 2,3 堆石子&#xff0c;则石子堆集合变为 a[1,5,4,5]。…

【学习】backdoor attacks、Adversarial Attack on Images、Adversarial Attack on Audio

文章目录一、后门攻击backdoor attacks1、data poisoning2、backdoored PLM3、defenseONION4、后门攻击:绕过ONION防御5、摘要二、Adversarial Attack on Imagesone pixel attackdifferential evolution三、Adversarial Attack on Audio一、后门攻击backdoor attacks 什么是后…

进程的终止和等待

目录 进程终止 如何获取退出码呢&#xff1f; 进程退出方法有哪些&#xff1f; 对于进程退出&#xff0c;内核OS做了什么&#xff1f; 进程等待 为什么要进行进程等待&#xff1f; 如何进行进程等待&#xff1f; 什么是阻塞和非阻塞等待&#xff1f; 进程终止 对于进程…

飞依诺冲刺科创板上市:上半年出现亏损,因商业秘密纠纷被起诉

近日&#xff0c;飞依诺科技股份有限公司&#xff08;下称“飞依诺”&#xff09;在上海证券交易所递交招股书&#xff0c;准备在科创板上市。本次冲刺上市&#xff0c;飞依诺计划募资11.22亿元&#xff0c;将用于生产基地升级项目、新产品研发与总部基地建设项目、营销网络建设…

【Linux】一文掌握Linux基本指令(下)

本章命令大致总结命令功能cat打印文件内容echo打印文件内容> 输出重定向 >>追加重定向< 输入重定向 more 查看文本内容 less等价于morehead打印文本前n行tail 打印文本后n行 |管道date时间相关cal日历sort文本排序uniq相邻文本降重zip打包压缩unzip解包tar打包/解包…

蓝桥杯寒假集训第四天(全球变暖DFS)

没有白走的路&#xff0c;每一步都算数&#x1f388;&#x1f388;&#x1f388; 题目描述&#xff1a; 有一个正方形区域&#xff0c;里面有大陆和海洋&#xff0c;暂且用‘.’表示海洋&#xff0c;用‘#’表示大陆。我们把上下左右都连在一起的大陆称之为岛屿。但是随着气温…

07 来自于网友的 retrieveFileStream 的一个问题, 导致系统程序异常

前言 可以先参考前面一篇文章 retrieveFileStream 之后需要调用 completePendingCommand 否则业务代码会存在问题 retrieveFileStream 之后需要调用 completePendingCommand 否则业务代码会存在问题 这里的问题 主要是来自于 某 qq 交流群的网友 呵呵 当然 这里测试用例代码…

新鲜速递:Spring Data JPA 3.0快速入门、进阶到精通

第一章、安装Spring Data JPA 第一步&#xff0c;先确保你使用的是Spring Boot 3.0或以上环境&#xff0c;可以在pom.xml里加入Spring Data JPA依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-…

【ROS】—— ROS通信机制——话题通信(二)

文章目录前言1. 话题通信1.1 话题通讯理论模型1.2 话题通信基本操作&#xff08;C&#xff09;1.2.1 简单发布框架的实现1.2.2 发布逻辑的实现1.2.3 订阅方的实现1.3 话题通信基本操作&#xff08;python&#xff09;1.3.1 发布的实现1.3.2 订阅的实现1.4 话题通信自定义msg1.4…