RabbitMQ学习-延迟队列

news2024/11/25 15:28:18

延迟队列

背:也就是给队列设置个过期时间,然后到时间消息变成死信,消费死信队列中的消息就行,再没什么玩意,演示队列优化就是不给队列这只TTL,再生产者代码中消息里面设置消息TTL,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。所以就是消费顺序问题要安装个插件

延迟队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定是按处理的元素的队列

延迟队列使用场景

1. 订单在十分钟之内未支付则自动取消
2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

设置TTL的两种方式:

1.队列设置TTL

在创建对别的收设置队列的x-message-ttl属性,例如

Map<String, Object> map = new HashMap<>();
//设置队列有效期为10秒
map.put("x-message-ttl",10000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,map);

消息设置TTL
对每条消息设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msg body".getBytes());
两者之间的区别:
1.如果设置了队列的TTL属性,那么一旦消息过期,就会被队列的丢弃
2.如果是消息设置了TTL属性,那么即使消息过期,也不一定会马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息挤压情况,那么已
经过期的消息也许还能存活较长时间
3.如果我们没有设置TTL,就表示消息永远不会过期,如果TTL设置为0,则表示除非此时可以直接投递到消费者,否则该消息会被丢弃

整合springboot

添加依赖

 <!--RabbitMQ 依赖-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!--swagger-->
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
 </dependency>
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!--RabbitMQ 测试依赖-->
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

 修改配置文件

spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

添加swagger配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
 @Bean
 public Docket webApiConfig(){
 return new Docket(DocumentationType.SWAGGER_2)
 .groupName("webApi")
 .apiInfo(webApiInfo())
 .select()
 .build();
 }
 private ApiInfo webApiInfo(){
 return new ApiInfoBuilder()
 .title("rabbitmq 接口文档")
 .description("本文档描述了 rabbitmq 微服务接口定义")
 .version("1.0")
 .contact(new Contact("enjoy6288", "http://atguigu.com", 
"1551388580@qq.com"))
 .build();
 }
}

队列TTL

代码架构图

创建两个队列 QA QB ,两者队列 TTL 分别设置为 10S 40S ,然后在创建一个交换机 X 和死信交
换机 Y ,它们的类型都是 direct ,创建一个死信队列 QD ,它们的绑定关系如下:

配置文件类代码

@Configuration
public class TtlQueueConfig {
 public static final String X_EXCHANGE = "X";
 public static final String QUEUE_A = "QA";
 public static final String QUEUE_B = "QB";
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String DEAD_LETTER_QUEUE = "QD";
 // 声明 xExchange
 @Bean("xExchange")
 public DirectExchange xExchange(){
 return new DirectExchange(X_EXCHANGE);
 }
 // 声明 xExchange
 @Bean("yExchange")
 public DirectExchange yExchange(){
 return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
 }
 //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
 @Bean("queueA")
 public Queue queueA(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //声明队列的 TTL
 args.put("x-message-ttl", 10000);
 return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
 }
 // 声明队列 A 绑定 X 交换机
 @Bean
 public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueA).to(xExchange).with("XA");
 }
 //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
 @Bean("queueB")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //声明队列的 TTL
 args.put("x-message-ttl", 40000);
 return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
 }
 //声明队列 B 绑定 X 交换机
 @Bean
 public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
 }
 //声明死信队列 QD
 @Bean("queueD")
 public Queue queueD(){
 return new Queue(DEAD_LETTER_QUEUE);
 }
 //声明死信队列 QD 绑定关系
 @Bean
 public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
 @Qualifier("yExchange") DirectExchange yExchange){
 return BindingBuilder.bind(queueD).to(yExchange).with("YD");
 }
}

 消息生产类代码

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @GetMapping("sendMsg/{message}")
 public void sendMsg(@PathVariable String message){
 log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
 rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
 rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
 } 
}

消费者消费代码

@Component
public class DeadLetterQueueConsumer {
 @RabbitListener(queues = "QD")
 public void receiveD(Message message, Channel channel) throws IOException {
 String msg = new String(message.getBody());
 log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
 }
}

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是 每增加一个新的时间需求,就要新增一个队列 ,这里只有 10S 40S
两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然
后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
延迟队列优化(队列不设置TTL时间)
代码架构图:
在这里新增了一个队列 QC, 绑定关系如下 , 该队列不设置 TTL 时间
配置文件代码类:
@Component
public class MsgTtlQueueConfig {
 public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
 public static final String QUEUE_C = "QC";
 //声明队列 C 死信交换机
 @Bean("queueC")
 public Queue queueB(){
 Map<String, Object> args = new HashMap<>(3);
 //声明当前队列绑定的死信交换机
 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
 //声明当前队列的死信路由 key
 args.put("x-dead-letter-routing-key", "YD");
 //没有声明 TTL 属性
 return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
 }
 //声明队列 B 绑定 X 交换机
 @Bean
 public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
 @Qualifier("xExchange") DirectExchange xExchange){
 return BindingBuilder.bind(queueC).to(xExchange).with("XC");
 }
}

消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
 rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
 correlationData.getMessageProperties().setExpiration(ttlTime);
 return correlationData;
 });
 log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}
发起请求
http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过
,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/578786.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ElasticSearch——Docker安装ElasticSearch和Kibana

Docker安装ElasticSearch 说明&#xff1a;由于是用docker安装&#xff0c;所以要确保已安装docker并docker环境可用。 docker安装步骤&#xff1a;https://wanli.blog.csdn.net/article/details/121445768 1、Docker安装ElasticSearch 获取指定版本的ES镜像 拉取镜像&#…

Ubuntu安装RabbitMQ server - 在ubuntu+cpolar+rabbitMQ环境下,实现mq服务端远程访问

文章目录 前言1.安装erlang 语言2.安装rabbitMQ3. 内网穿透3.1 安装cpolar内网穿透(支持一键自动安装脚本)3.2 创建HTTP隧道 4. 公网远程连接5.固定公网TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址 转载自cpolar内网穿透的文章&#xff1a;无公网IP&…

nodejs+vue社区重点人员户籍信息查询系统

为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&#xff0c;重点人员信息查询就是信息时代变革中的产物之一。 任何系统都要遵循系统设计的基本流程&#xff0c;本系统也不例外&#xff0c;同样需要…

基于SSM的土家风景文化管理平台

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 前言…

JetBrains的C和C++集成开发环境CLion 2023版本在Win10系统的下载与安装配置教程

目录 前言一、CLion安装二、使用配置总结 前言 CLion是一款为C和C语言开发人员设计的集成开发环境&#xff08;IDE&#xff09;。它提供了丰富的功能和工具&#xff0c;可以帮助开发人员更高效地编写、调试和部署C和C应用程序。 CLion的主要特点&#xff1a; ——代码编辑器…

常见开源协议介绍

开源协议是指开放源代码软件的使用、修改和分发的规则。开源协议的出现&#xff0c;使得开发者可以在保护自己的知识产权的同时&#xff0c;也可以让其他人使用、修改和分发自己的代码。本文将介绍几种常见的开源协议。 一、GPL协议 GPL&#xff08;GNU General Public Licens…

哈希表(哈希函数和处理哈希冲突)_20230528

哈希表&#xff08;哈希函数和处理哈希冲突) 前言 关于哈希表的主题的小记原计划5月23日完成&#xff0c;由于本人新冠阳性&#xff0c;身体发烧乏力&#xff0c;周末感觉身体状况稍加恢复&#xff0c;赶紧打开电脑把本文完成&#xff0c;特别秉承“写是为了更好地思考&#…

搜索算法总结

文章目录 搜索算法1. 深度优先搜索&#xff08;Depth-First-Search, DFS&#xff09;2. 广度优先搜索&#xff08;Breadth-first search, BFS&#xff09;3. 启发式搜索策略3.1 爬山法&#xff08;Hill climbing&#xff09;3.2 最佳优先搜索&#xff08;Best-first search&…

【嵌入式环境下linux内核及驱动学习笔记-(13-中断管理)】

目录 1、中断基本概念2、ARM体系中断系统2.1 ARM具有的七种异常模式与中断的关系2.2 ARM多核环境下的中断2.3 exynos4412(contex A9)的中断 3、中断处理程序架构4、 中断接口编程4.1 中断接口函数4.1.1 request_irq4.1.2 free_irq4.1.3 irqreturn_t4.1.4 irq_handler_t 中断处理…

C语言初阶之函数介绍及练习

函数介绍及练习 1.函数是什么&#xff1f;2.C语言中函数的分类&#xff1a;2.1 库函数2.2 自定义函数 3. 函数的参数3.1 实际参数&#xff08;实参&#xff09;&#xff1a;3.2 形式参数&#xff08;形参&#xff09;&#xff1a; 4.函数的调用4.1 传值调用4.2 传址调用 5. 函数…

真相只有一个——谁是凶手

谁是凶手 1.题目描述2. 解题思路3.代码展示 所属专栏&#xff1a;脑筋急转弯❤️ &#x1f680; >博主首页&#xff1a;初阳785❤️ &#x1f680; >代码托管&#xff1a;chuyang785❤️ &#x1f680; >感谢大家的支持&#xff0c;您的点赞和关注是对我最大的支持&am…

漫游计算机系统

1.信息就是位 上下文 那么什么是信息呢&#xff1f; 在计算机系统中&#xff0c;所有的信息——包括磁盘文件、内存中的程序、内存中存放的用户数据以及网络上传送的数据。本质上是一串比特位。 那么又要了解什么是比特了&#xff0c;比特&#xff08;bit)就是二进制&#xff…

基于标准库函数的STM32的freertos的移植(一)——github源码压缩包下载

由于freertos官网将freertos内核与freertos工程分别进行版本管理&#xff0c;因此下载freertos需要将参考工程和内核分别下载。由于采用ST公司提供的标准库函数进行因此还需要下载标准库函数&#xff0c;然后进行移植配置。具体流程如下详细描述&#xff1a; 1.首先在github的…

git Husky

虽然我们已经要求项目使用eslint了&#xff0c;但是不能保证组员提交代码之前都将eslint中的问题解决掉了&#xff1a; 也就是我们希望保证代码仓库中的代码都是符合eslint规范的&#xff1b; 那么我们需要在组员执行 git commit 命令的时候对其进行校验&#xff0c;如果不符合…

centos7安装docker 并创建mysql

Docker 分为 CE 和 EE 两大版本。CE 即社区版&#xff08;免费&#xff0c;支持周期 7 个月&#xff09;&#xff0c;EE 即企业版&#xff0c;强调安全&#xff0c;付费使用&#xff0c;支持周期 24 个月。 Docker CE 分为 stable test 和 nightly 三个更新频道。 官方网站上有…

关于强电与弱的的介绍

强电&#xff1f;弱电&#xff1f;傻傻分不清楚&#xff0c;今天海翎光电的小编为大家系统的介绍一下强电与弱电。 什么是强电&#xff1f; &#xff08;1&#xff09;供配电系统&#xff1a;供配电系统包括负荷分级、供电措施、负荷力矩、电网谐波限值、用电指标、负荷所需要…

MySQL数据库修改root账户密码

博主今天登录数据库遇到了一个问题&#xff0c;通过这篇文章&#xff08;http://t.csdn.cn/58ECT&#xff09;解决了。文中关于修改root账户密码的部分&#xff0c;博主觉得有必要写一篇文章总结下。 第一步&#xff1a;用管理员账户打开CMD 第二步&#xff1a;开启mysql服务 …

dubbo源码阅读: dubbo的xml文件如何解析的?

dubbo源码阅读&#xff1a; dubbo的xml文件如何解析的&#xff1f; DubboNamespaceHandlerspring 的接口 NamespaceHandlerspring 的抽象类 NamespaceHandlerSupport学以致用 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns:xsi"http…

征文 | 吸引铁粉?成为CSDN明星!

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 征文 | 吸引铁粉&#xff1f;成为CSDN明星&#xff01; 导读 当今数字时代&#xff0c;社交媒体和在线社区成为了人们交流和分享的主要平台之一&#xff0c;CSDN就是其…

前沿重器[34] | Prompt设计——LLMs落地的版本答案

前沿重器 栏目主要给大家分享各种大厂、顶会的论文和分享&#xff0c;从中抽取关键精华的部分和大家分享&#xff0c;和大家一起把握前沿技术。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。&#xff08;算起来&#xff0c;专项启动已经…