RabbitMQ之延迟队列

news2024/11/26 22:48:33

        延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

        例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?

  1. 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;
  2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;
  3. 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;

        你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始...

        同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列”来变相的实现。

可以使用rabbitmq_delayed_message_exchange插件实现。

        这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message exchange)。

  • 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
  • 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
  • 队列(queue)再把消息发送给监听它的消费者(customer) 

下载插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装插件:

        将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins 

启用插件

rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启rabbitmq-server

systemctl restart rabbitmq-server

SpringBoot代码案例

(1)xml配置文件与properties连接配置

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
spring.application.name=delayed_exchange
spring.rabbitmq.host=192.168.80.121
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 设置手动确认消息
#spring.rabbitmq.listener.simple.acknowledge-mode=manual

(2)SpringBootApplication主入口类

package com.lagou.rabbit.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Demo02RabbitmqDelayedApplication {

    public static void main(String[] args) {
        SpringApplication.run(Demo02RabbitmqDelayedApplication.class, args);
    }

}

(3)RabbitMQ的对象配置

package com.lagou.rabbit.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableRabbit
@ComponentScan("com.lagou.rabbit.demo")
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return new Queue("queue.delayed", true, false, false, null);
    }

    @Bean
    public Exchange exchange() {
        Map<String, Object> arguments = new HashMap<>();
        // 使用x-delayed-type指定交换器的类型
        arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
        // 使用x-delayed-message表示使用delayed exchange插件处理消息
        return new CustomExchange("ex.delayed", "x-delayed-message", true, false, arguments);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
    }

    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    @Bean
    @Autowired
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

(4)使用推消息模式接收延迟队列的广播

package com.lagou.rabbit.demo.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

import java.io.IOException;

@Component
public class MyMeetingListener {
    @RabbitListener(queues = "queue.delayed")
    public void onMessage(Message message, Channel channel) throws IOException {
        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));

        // 消息确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

(5)开发RestController,用于向延迟队列发送消息,并指定延迟的时长

package com.lagou.rabbit.demo.controller;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RestController
public class DelayedController {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/meeting/{second}")
    public String bookMeeting(@PathVariable Integer second) {

        // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
        // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
        // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
        // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
                // 设置消息的过期时间
                .setHeader("x-delay", (second - 10) * 1000)
                .setContentEncoding("utf-8")
                .build();

        Message message = MessageBuilder.withBody("还有10s开始开会了".getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();

        // 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
        // rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message, msg -> {
        // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
         // 当消息转换完,设置消息头字段
        // msg.getMessageProperties().setHeader("x-delay", (seconds - 5) * 1000);
            // return msg;
        // });

        amqpTemplate.send("ex.delayed","key.delayed",message);

        return "会议订好了";
    }
}

(6)访问,然后查看输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/51879.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

zcu106 lwip搭建以太网配置寄存器

文章目录实验一1.配置网口GEM32.导出xsa文件&#xff0c;在vitis中创建工程&#xff0c;选择freertos10_xilinx的操作系统来使用3.配置lwip211&#xff0c;选择SOCKET API的模式4.创建工程 选择FreeRTOS Iwip TCP Perf Server模板5.代码分析main.cfreertos_tcp_perf_server.cfr…

基于yolov5n的轻量级MSTAR遥感影像目标检测系统设计开发实战

做过很多目标检测类的项目了&#xff0c;最近看到一个很早之前用过的数据集MSTAR&#xff0c;之前老师给的任务是基于这个数据集来搭建图像识别模型&#xff0c;殊不知他也是可以用来做目标检测的&#xff0c;今天正好有点时间就想着基于这个数据集来做一下目标检测实践。 首先…

利用车载摄像头了解道路语义的鸟瞰图

以下内容来自从零开始机器人SLAM知识星球 每日更新内容 点击领取学习资料 → 机器人SLAM学习资料大礼包 #论文##开源代码# Understanding Bird’s-Eye View of Road Semantics using an Onboard Camera 论文地址&#xff1a;https://arxiv.org/abs/2012.03040 作者单位&#…

自助建站工具

每用一次自助建站工具&#xff0c;就有一个程序员失业。 作为企业老板的你&#xff0c;要为公司的获客&#xff0c;企业推广发愁&#xff0c;但是预算有限&#xff0c;招人也很困难&#xff0c;不仅要面试程序员&#xff0c;后续还要检验这个程序员的功力&#xff0c;实在是太…

CentOS升级python3版本

介绍 本文将详细介绍在CentOS7.9系统的服务器将自带的python3.6.8版本升级到3.8.0版本的过程。 在升级前CentOS7.9中已经同时存在两个python版本分别是2.7.5和3.6.8。 查看CentOS版本命令&#xff1a; cat /etc/centos-release这是我升级后的python版本&#xff08;python3升…

Minio设置文件永久访问和下载

1. docker pull minio/mc 2. docker run -it --entrypoint/bin/sh minio/mc 3. mc config host add <ALIAS> <YOUR-S3-ENDPOINT> <YOUR-ACCESS-KEY> <YOUR-SECRET-KEY> [--api API-SIGNATURE] mc ls minio ALIAS: 别名就是给你的云存储服务起了一个…

2021-02-01

oracle设置定期修改密码 --通过如下sql查询用户密码有效期配置 SELECT username,PROFILE FROM dba_users; --上述sql查询结果一般为default --使用如下sql可以查询到default的默认值 select * from dba_profiles where profile DEFAULT and resource_name PASSWORD_LI…

HTML+CSS大作业【传统文化艺术耍牙15页】学生个人网页设计作品

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

MES系统中生产计划模块的重要作用

MES系统中的“生产调度”支持从“工单管理”中收到的作业队列&#xff0c;根据生产目标&#xff08;时间和数量&#xff09;&#xff0c;必须考虑到人员、设备、材料的可用性等限制和生产过程中的各种中断&#xff0c;生成一个作业时间表&#xff0c;即生产作业计划。MES系统“…

iOS 接入firebase简易步骤

接入准备 去firebase官网注册应用并下载配置文件GoogleService-Info.plist 接入步骤 1.通过cocopods导入以下两个依赖 pod Firebase/Analytics pod Firebase/Core 2.导入成功后将配置文件GoogleService-Info.plist拖入项目中 3.代码支持 引入#import <Firebase/Firebas…

【云原生系列】第五讲:Knative Eventing 下

目录 序言 1.Parallel介绍 1.1 Parallel Spec ​编辑 2.Sequence 2.1.Sequence Spec 2.2适用场景 2.3 Broker/Trigger 2.4 代码示例 3.投票 序言 三言两语&#xff0c;不如细心探索。 今天整理了一下Eventing 相关知识点 ParallelSequence希望此文&#xff0c;能帮…

C#正则表达式总结

推荐一个专门用于编写正则表达式的网站&#xff1a; regex101: build, test, and debug regex 参考文档&#xff1a; https://zh.wikipedia.org/wiki/%E6%AD%A3%E5%88%99%E8%A1%A8%E8%BE%BE%E5%BC%8F 特殊字符的意义&#xff1a; ^ : 表示字符串的开头 例子&#xff1a; …

局域网的网络硬件主要包括有什么

一、网络服务器 是计算机局域网的核心部件。 网络操作系统是在网络服务器上运行的&#xff0c;网络服务器的效率直接影响整个网络的效率。 因此&#xff0c;一般要用高档计算机或专用服务器计算机作为网络服务器。 二、网络工作站 网络工作站是通过网络接口卡连接到网络上的…

银行软件测试:基于互联网金融平台的测试框架设计与分析

目前互联网金融火的一塌糊涂&#xff0c;基于互联网金融平台的自动化测试的项目也是如火如荼的进行。笔者手头上负责一个p2p项目的测试框架开发&#xff0c;因此如何设计一套有效的测试框架也成为工作所需和互相交流测试经验的必须。进入》软件测试社群学习交流 这个网站的后台…

科研绘图配色方案

科研绘图配色方案 在撰写论文的时候&#xff0c;美观&#xff0c;大气&#xff0c;上档次的图表能够很好地给自己的论文加分。但是在绘制图表的时候往往会面临色彩搭配的问题&#xff0c;选择合适的色彩搭配能够有效地展示自己的方法&#xff0c;但是色彩搭配选择不当的话往往…

Go学习之路:并发(DAY 3)

文章目录前引并发1、Go协程/简单创建2.1、信道/简单创建信道2.2、信道/限制了大小的信道2.3、信道/range close信道3.1、Select语句前引 听了会歌 一看了下今天已经下午2&#xff1a;50了 我们学校也好像开始放学生们回家了 那今天最后就愉快愉快的把我们的 A Tour Of Go 最后…

申报高新技术企业需要哪些条件?

高新技术企业是在国家重点支持的高新技术领域中持续进行科研开发和技术成果转化&#xff0c;形成企业核心自主知识产权&#xff0c;并以此为基础开展生产经营活动的企业。成为高新技术企业以后会减免40%的所得税。 申报高新技术企业需要哪些条件&#xff1f; 第一点&#xff…

回收站不见了怎么恢复?一招把回收站的图标找回来!

有时我们会有这样的烦恼&#xff0c;明明回收站里面的东西还没被删除或者是还在回收站里面&#xff0c;但是回收站的图标就是看不到了。其实&#xff0c;这正是我们所说的电脑回收站不显示图标导致。因为我们关闭回收站位置以后&#xff0c;电脑系统都会自动把回收站置为“已删…

TPM零知识学习四 —— tpm2-tss源码安装

tpm2-tss包的的安装方法参考&#xff1a; tpm2-tss/INSTALL.md at master tpm2-software/tpm2-tss GitHub 1. 源码下载 $ git clone https://github.com/tpm2-software/tpm2-tss.git 下载完成后&#xff0c;源码内容如下所示&#xff1a; $ ls afl-fuzzing CHANGELOG.m…

基于ssm的小区物业管理系统

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下&#xff0c;你想解决的问…