rabbitmq延迟队列的使用

news2024/10/6 16:20:15

rabbitmq延迟队列的使用

1、场景:

1.定时发布文章
2.秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。
3.预约餐厅,提前半个小时发短信通知用户。
A -> 13:00 17:00 16:30 延迟时间: 730 * 60 * 1000
B -> 11:00 18:00 17:30 延迟时间: 13
30 * 60 * 1000
C -> 8:00 14:00 13:30 延迟时间: 11*30 * 60 * 1000

第一种方式:创建具有超时功能且绑定死信交换机的消息队列

 @Bean
    public Queue directQueueLong(){
        return   QueueBuilder.durable("业务队列名称")
                .deadLetterExchange("死信交换机名称")
                .deadLetterRoutingKey("死信队列 RoutingKey")
                .ttl(20000) // 消息停留时间
                //.maxLength(500)
                .build();
    }

监听死信队列,即可处理超时的消息队列

缺点:
上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。

第二种方式:创建通用延时消息

rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
	message => {
		message.getMessageProperties().setExpiration(String.valueOf(5000))
			        return message;
	            }
           );

缺点:
该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

第三种方式:使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

1、下载延迟插件

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
我这里 MQ 的版本是 3.10.0 现在去 GitHub 上根据版本号下载插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、安装插件并启用

  • 我用的是 Docker 客户端,下载完成后直接把插件放在 /opt/rabbitmq 目录,然后拷贝到容器内plugins目录下(rabbitmq是容器的name,也可以使用容器id)
docker cp /opt/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
  • 进入 Docker 容器
docker exec -it rabbitmq /bin/bash
  • 在plugins内启用插件
#先执行,解除防火墙限制,增加文件权限
cd plugins
umask 0022
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 退出容器
exit
  • 重启 RabbitMQ
docker restart rabbitmq
  • 通过UI查看
    在这里插入图片描述

原理

在这里插入图片描述

代码使用

消费者

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.consumers;

import cn.hutool.core.map.MapUtil;
import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * <p>Project: spring-rabbitmq - DelayConsumer</p>
 * <p>Powered by fpl1116 On 2024-04-09 11:34:07</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
@Configuration
@Slf4j
public class DelayConsumer {
    @Bean
    public Queue delayQueue1(){
        return   QueueBuilder.durable("Delay_Q01").lazy().build();
    }
    @Bean
    public CustomExchange delayExchange(){
        //参数x-delayed-type
        Map<String, Object> map = MapUtil.of("x-delayed-type","direct");
        return new CustomExchange("Delay_E01","x-delayed-message",true,false,map);
    }
    @Bean
    public Binding binding1(Queue delayQueue1, CustomExchange delayExchange){
        return BindingBuilder.bind(delayQueue1).to(delayExchange).with("RK01").noargs();
    }

//     @RabbitListener(queues = "Delay_Q01")
    public void receiveMessage(OrderingOk msg) {
        log.info("消费者1 收到消息:"+ msg );
    }


}

生产者

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.provider;

import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * <p>Project: spring-rabbitmq - DelayProvider</p>
 * <p>Powered by fpl1116 On 2024-04-09 11:35:51</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
@Service
public class DelayProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OrderingOk orderingOk) {
        rabbitTemplate.convertAndSend("Delay_E01", "RK01", orderingOk,new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                int id  = orderingOk.getId();
                int ttl = 0;
                if(id == 1){
                    ttl = 50*1000;
                }else if(id == 2){
                    ttl = 30*1000;
                }else if(id ==3){
                    ttl = 40*1000;
                }else if(id ==4){
                    ttl = 10*1000;
                }else if(id ==5){
                    ttl = 20*1000;
                }
                //延迟交换机使用的delay参数,设置消息的延期时长,单位是微妙
                message.getMessageProperties().setDelay(ttl);
                //延迟交换机消息默认是持久化的
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                return message;
            }
        });
    }
}

在这里插入图片描述

测试

@Test
    void test5() throws IOException {
        for (int i = 1; i <=5;i++){
            OrderingOk orderingOk = OrderingOk.builder().id(i).name("张 " + i).build();
            delayProvider.send(orderingOk);
            System.out.println("发送成功:"+i);
        }
        System.in.read();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1583468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

再也不怕面试官问 OOM了,一次生产环境 Metaspace OOM 排查流程实操!

问题背景 小奎公司的运维同时今天反映核心业务一个服务目前 CPU 的使用率、堆内存、非堆内存的使用率有点高。刚反映没有过多久该服务就直接 OOM 了&#xff0c;以下是生产监控平台监控信息。 CPU 使用率监控 堆内存和非堆内存使用率 OOM 产生的日志报错信息 问题分析 根…

kali使用msf+apkhook520+cploar实现安卓手的攻击

学习网络安全的过程中&#xff0c;突发奇想怎么才能控制或者说是攻击手机 边找工作边实验 话不多说启动kali 一、使用msfapktool生成简单的木马程序 首先使用kali自带的msfvenom写上这样一段代码 选择安卓 kali的ip 一个空闲的端口 要输出的文件名 msfvenom -p android/met…

9个最受欢迎的开源自动化测试框架盘点!

自动化测试框架可以帮助测试人员评估多个Web和移动应用程序的功能&#xff0c;安全性&#xff0c;可用性和可访问性。尽管团队可以自己构建复杂的自动化测试框架&#xff0c;但是当他们可以使用现有的开源工具&#xff0c;库和测试框架获得相同甚至更好的结果时&#xff0c;通常…

ubuntu系统逻辑卷Logical Volume扩容根分区

Linux LVM详解 https://blog.csdn.net/qq_35745940/article/details/119054949 https://blog.csdn.net/weixin_41891696/article/details/118805670 https://blog.51cto.com/woyaoxuelinux/1870299 LVM&#xff08;Logical Volume Manager&#xff09;逻辑卷管理&#xff0c…

C++入门语法(命名空间缺省函数函数重载引用内联函数nullptr)

目录 前言 1. 什么是C 2. C关键字 3. 命名空间 3.1 命名空间的定义 3.2 命名空间的使用 4. C输入和输出 5. 缺省函数 5.1 概念 5.2 缺省参数分类 6. 函数重载 6.1 概念 6.2 为何C支持函数重载 7. 引用 7.1 概念 7.2 特性 7.3 常引用 7.4 引用与指针的区别 7…

OSCP靶场--Hetemit

OSCP靶场–Hetemit 考点(python代码注入 systemctrl提权) 1.nmap扫描 ## ┌──(root㉿kali)-[~/Desktop] └─# nmap 192.168.173.117 -sV -sC -Pn --min-rate 2500 -p- Starting Nmap 7.92 ( https://nmap.org ) at 2024-04-10 05:52 EDT Nmap scan report for 192.168.1…

详解多态、虚继承、多重继承内存布局及虚表(C++)

本篇文章深入分析多态、虚继承、多重继承的内存布局及虚函数表以及实现原理。编译器使用VS 2022&#xff0c;直接放结论&#xff0c;代码及内存调试信息在后文。 结论 内存布局 一个没有虚函数的类&#xff0c;它的大小其实就是所有成员变量的大小&#xff0c;此时它就是一个…

13 指针(上)

指针是 C 语言最重要的概念之一&#xff0c;也是最难理解的概念之一。 指针是C语言的精髓&#xff0c;要想掌握C语言就需要深入地了解指针。 指针类型在考研中用得最多的地方&#xff0c;就是和结构体结合起来构造结点(如链表的结点、二叉树的结点等)。 本章专题脉络 1、指针…

redis string底层为什么使用sds, sds好处?redis 的动态字符串优点?

1. redis 的键值对&#xff0c;都是由对象组成的&#xff0c; 其中键总是一个字符串对象&#xff08;string object&#xff09; 而键的value则可以是&#xff1a;“字符串对象”&#xff0c; “列表对象 &#xff08;list object&#xff09;”&#xff0c;“哈希对象 (hash o…

《由浅入深学习SAP财务》:第2章 总账模块 - 2.6 定期处理 - 2.6.3 月末操作:外币评估

2.6.3 月末操作&#xff1a;外币评估 企业的外币业务在记账时一般使用期初的汇率或者即时汇率&#xff0c;但在月末&#xff0c;需要按照月末汇率对外币的余额或者未清项进行重估&#xff08;revaluation&#xff09;。 企业在资产负债表日&#xff0c;应当按照下列规…

nandgame中的Code generation(代码生成)

题目说明&#xff1a; 代码生成为语言的语法规则定义代码生成&#xff0c;以支持加法和减法。 您可以使用在前面级别中定义的堆栈操作&#xff08;如ADD和SUB&#xff09;。代码生成模板通常需要包含规则中其他符号的代码。 这些可以通过方括号中的符号名称插入。例如&#xf…

【初中生讲机器学习】15. EM 算法一万字详解!一起来学!

创建时间&#xff1a;2024-04-08 最后编辑时间&#xff1a;2024-04-10 作者&#xff1a;Geeker_LStar 你好呀~这里是 Geeker_LStar 的人工智能学习专栏&#xff0c;很高兴遇见你~ 我是 Geeker_LStar&#xff0c;一名初三学生&#xff0c;热爱计算机和数学&#xff0c;我们一起加…

西门子PLC(S7-200 SMART)学习笔记1:初识PLC可编程逻辑器件

今日开始我的西门子PLC学习之路&#xff0c;学习的型号以S7-200 SMART为主 主要认识一下PLC是什么、型号怎么看、 通信相关、编程软件、构造及工作原理 目录 西门子官方PLC手册获取&#xff1a; 1、PLC可编程逻辑器件的基本认识&#xff1a; PLC的结构及各部分的作用&#xff…

Kali安装黑屏与进入系统后不显示中文的解决办法

使用镜像版本kali-linux-2024.1-installer-amd64.iso 一.创建虚拟机安装Kali镜像时&#xff0c;安装后要重启时发现左上角有个— 闪动并黑屏&#xff0c;启动不成功 上述办法也很简单&#xff0c;可以试试再windows中的CMD终端输入netsh winsock reset&#xff0c;重启电脑如果…

嵌入式Linux:Linux库函数

目录 1、Linux库函数简介 2、标准C语言库函数 1、Linux库函数简介 Linux 提供了丰富的库函数&#xff0c;涵盖了各种领域&#xff0c;从文件操作到网络编程、图形界面、数学运算等。这些库函数大多数都是标准的 C 库函数&#xff0c;同时也包括一些特定于 Linux 系统的库。 …

【Linux】初识Linux,虚拟机安装Linux系统,配置网卡

前言 VMware软件&#xff1a;首先&#xff0c;确保您已经下载了VMware Workstation软件并安装在电脑上。VMware Workstation是一款功能强大的虚拟化软件&#xff0c;它允许在单一物理机上运行多个操作系统。 Linux镜像文件&#xff1a;需要准备一个Linux操作系统的镜像文件。…

初学python记录:力扣1702. 修改后的最大二进制字符串

题目&#xff1a; 给你一个二进制字符串 binary &#xff0c;它仅有 0 或者 1 组成。你可以使用下面的操作任意次对它进行修改&#xff1a; 操作 1 &#xff1a;如果二进制串包含子字符串 "00" &#xff0c;你可以用 "10" 将其替换。 比方说&#xff0c;…

LWIP一探究竟

1.网卡接收数据的流程 我们网卡接收数据基本上就是开发板上eth接收完数据后产生一个中断,然后释放一个信号量通知网卡接收线程去处理这些接收的数据,然后将这些数据封装成信息,投递到tcpip_mbox邮箱中,LWIP内核线程得到这个消息,就对消息进行解析,根据消息中数据包类型进行处理…

docker使用canal

1. 准备MySql主库 1.1.在服务器新建文件夹 mysql/data&#xff0c;新建文件 mysql/conf.d/my.cnf 其中my.cnf 内容如下 [mysqld] log_timestampsSYSTEM default-time-zone8:00 server-id1 log-binmysql-bin binlog-do-db mall # 要监听的库 binlog_formatROW配置解读&#…

Harmony鸿蒙南向驱动开发-Regulator

Regulator模块用于控制系统中各类设备的电压/电流供应。在嵌入式系统&#xff08;尤其是手机&#xff09;中&#xff0c;控制耗电量很重要&#xff0c;直接影响到电池的续航时间。所以&#xff0c;如果系统中某一个模块暂时不需要使用&#xff0c;就可以通过Regulator关闭其电源…