RabbitMQ实现延时消息的两种方法

news2024/12/23 10:30:00

RabbitMQ实现延时消息的两种方法

1、死信队列

1.1消息什么时候变为死信(dead-letter)

  1. 消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue 重回队列属性设为false。
  2. 消息在队列里得时间超过了该消息设置的过期时间(TTL)。
  3. 消息队列到达了它的最大长度,之后再收到的消息。

1.2死信队列的原理

当一个消息再队列里变为死信时,它会被重新publish到另一个exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个交换机就可以了。

1.3 代码实现

  1. 引入amqp依赖
  2. 声明交换机,队列
package com.lank.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    //死信交换机,队列,路由相关配置
    public static final String DLK_EXCHANGE = "dlk.exchange";
    public static final String DLK_ROUTEKEY = "dlk.routeKey";
    public static final String DLK_QUEUE = "dlk.queue";

    //业务交换机,队列,路由相关配置
    public static final String DEMO_EXCHANGE = "demo.exchange";
    public static final String DEMO_QUEUE = "demo.queue";
    public static final String DEMO_ROUTEKEY = "demo.routeKey";

    //延时插件DelayedMessagePlugin的交换机,队列,路由相关配置
    public static final String DMP_EXCHANGE = "dmp.exchange";
    public static final String DMP_ROUTEKEY = "dmp.routeKey";
    public static final String DMP_QUEUE = "dmp.queue";

    @Bean
    public DirectExchange demoExchange(){
        return new DirectExchange(DEMO_EXCHANGE,true,false);
    }

    @Bean
    public Queue demoQueue(){
        //只需要在声明业务队列时添加x-dead-letter-exchange,值为死信交换机
        Map<String,Object> map = new HashMap<>(1);
        map.put("x-dead-letter-exchange",DLK_EXCHANGE);
        //该参数x-dead-letter-routing-key可以修改该死信的路由key,不设置则使用原消息的路由key
        map.put("x-dead-letter-routing-key",DLK_ROUTEKEY);
        return new Queue(DEMO_QUEUE,true,false,false,map);
    }

    @Bean
    public Binding demoBind(){
        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY);
    }

    @Bean
    public DirectExchange dlkExchange(){
        return new DirectExchange(DLK_EXCHANGE,true,false);
    }

    @Bean
    public Queue dlkQueue(){
        return new Queue(DLK_QUEUE,true,false,false);
    }

    @Bean
    public Binding dlkBind(){
        return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY);
    }


    //延迟插件使用
    //1、声明一个类型为x-delayed-message的交换机
    //2、参数添加一个x-delayed-type值为交换机的类型用于路由key的映射
    @Bean
    public CustomExchange dmpExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    @Bean
    public Queue dmpQueue(){
        return new Queue(DMP_QUEUE,true,false,false);
    }

    @Bean
    public Binding dmpBind(){
        return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs();
    }
}
  1. 声明一个类用于发送带过期时间的消息
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 1. @author lank
 2. @since 2020/12/14 10:33
 */
@Component
@Slf4j
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //使用死信队列发送消息方法封装
    public void send(String message,Integer time){
        String ttl = String.valueOf(time*1000);
        //exchange和routingKey都为业务的就可以,只需要设置消息的过期时间
        rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息的过期时间,是以毫秒为单位的
                message.getMessageProperties().setExpiration(ttl);
                return message;
            }
        });
        log.info("使用死信队列消息:{}发送成功,过期时间:{}秒。",message,time);
    }

    //使用延迟插件发送消息方法封装
    public void send2(String message,Integer time){
        rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
            //使用延迟插件只需要在消息的header中添加x-delay属性,值为过期时间,单位毫秒
                message.getMessageProperties().setHeader("x-delay",time*1000);
                return message;
            }
        });
        log.info("使用延迟插件发送消息:{}发送成功,过期时间:{}秒。",message,time);
    }
}
  1. 编写一个类用于消费消息
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageReceiver {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE)
    public void onMessage(Message message){
        log.info("使用死信队列,收到消息:{}",new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE)
    public void onMessage2(Message message){
        log.info("使用延迟插件,收到消息:{}",new String(message.getBody()));
    }
}
  1. 编写Controller调用发送消息方法测试结果
package com.lank.demo.controller;
import com.lank.demo.rabbitmq.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    public MessageSender messageSender;

    //死信队列controller
    @GetMapping("/send")
    public String send(@RequestParam String msg,Integer time){
        messageSender.send(msg,time);
        return "ok";
    }

    //延迟插件controller
    @GetMapping("/send2")
    public String sendByPlugin(@RequestParam String msg,Integer time){
        messageSender.send2(msg,time);
        return "ok";
    }

}
  1. 配置文件application.properties
server.port=4399
#virtual-host使用默认的/就好,如果需要/demo需自己在控制台添加
spring.rabbitmq.virtual-host=/demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  1. 启动项目,打开rabbitmq控制台,可以看到交换机和队列已经创建好。

在这里插入图片描述

在这里插入图片描述

  1. 在浏览器中请求http://localhost:4399/send?msg=hello&time=5,从控制台的输出来看,刚好5s后接收到消息。
2020-12-16 22:47:28.071  INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:hello发送成功,过期时间:5秒。
2020-12-16 22:47:33.145  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:hello

1.4死信队列的一个小注意点

当我往死信队列中发送两条不同过期时间的消息时,如果先发送的消息A的过期时间大于后发送的消息B的过期时间时,由于消息的顺序消费,消息B过期后并不会立即重新publish到死信交换机,而是会等到消息A过期后一起被消费。

依次发送两个请求http://localhost:4399/send?msg=消息A&time=30和http://localhost:4399/send?msg=消息B&time=10,消息A先发送,过期时间30S,消息B后发送,过期时间10S,我们想要的结果应该是10S收到消息B,30S后收到消息A,但结果并不是,控制台输出如下:

2020-12-16 22:54:47.339  INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:消息A发送成功,过期时间:30秒。
2020-12-16 22:54:54.278  INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信队列消息:消息B发送成功,过期时间:10秒。
2020-12-16 22:55:17.356  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:消息A
2020-12-16 22:55:17.357  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信队列,收到消息:消息B

消息A30S后被成功消费,紧接着消息B被消费。因此当我们使用死信队列时应该注意是否消息的过期时间都是一样的,比如订单超过10分钟未支付修改其状态。如果当一个队列各个消息的过期时间不一致时,使用死信队列就可能达不到延时的作用。这时候我们可以使用延时插件来实现这需求。

2 、延时插件

RabbitMQ Delayed Message Plugin是一个rabbitmq的插件,所以使用前需要安装它,可以参考的GitHub地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2.1如何实现

  1. 安装好插件后只需要声明一个类型type为"x-delayed-message"的exchange,并且在其可选参数下配置一个key为"x-delayed-typ",值为交换机类型(topic/direct/fanout)的属性。
  2. 声明一个队列绑定到该交换机
  3. 在发送消息的时候消息的header里添加一个key为"x-delay",值为过期时间的属性,单位毫秒。
  4. 代码就在上面,配置类为DMP开头的,发送消息的方法为send2()。
  5. 启动后在rabbitmq控制台可以看到一个类型为x-delayed-message的交换机。

在这里插入图片描述

在这里插入图片描述

  1. 继续在浏览器中发送两个请求http://localhost:4399/send2?msg=消息A&time=30和http://localhost:4399/send2?msg=消息B&time=10,控制台输出如下,不会出现死信队列出现的问题:
2020-12-16 23:31:19.819  INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延迟插件发送消息:消息A发送成功,过期时间:30秒。
2020-12-16 23:31:27.673  INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延迟插件发送消息:消息B发送成功,过期时间:10秒。
2020-12-16 23:31:37.833  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延迟插件,收到消息:消息B
2020-12-16 23:31:49.917  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延迟插件,收到消息:消息A

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/671821.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java 设计模式--创建者模式

参考&#xff1a;Java常见设计模式总结 概念 概念理解一&#xff1a;将复杂对象的创建过程分解在不同的方法中&#xff0c;不同的创建过程组装成不同对象。对象的创建与产品本身分离开&#xff0c;使得对象的创建过程更加清晰。例如&#xff1a;旅游套餐售卖场景。 一个套餐大…

@DateTimeFormat与@JsonFormat不完全解析

目录 前言测试代码DateTimeFormat不加任何注解的情况普通请求JSON请求 JsonFormat普通请求JSON请求 其他方式&#xff08;InitBinder&#xff09;结论源码地址 前言 一直以来对DateTimeFormat与JsonFormat 比较模糊&#xff0c;容易搞忘&#xff0c;今天就做个笔记&#xff0c…

【MySQL 利器之 mysqldump】

文章目录 前言一、mysqldump二、环境三、使用步骤1.服务器与服务器间直接同步2.导出到sql文件3.sql文件导入 总结使用方式 1 服务器间直连方式同步&#xff1a;使用中间SQL 文件方式&#xff1a;datax&#xff1a; 前言 1.随着服务器环境改造&#xff0c;新旧数据库环境更换&a…

微流控压力控制器和微流控注射泵的性能比较

摘要&#xff1a;针对微流控技术中的压力和流量控制&#xff0c;本文介绍了目前常用的两类装置&#xff1a;注射泵和压力泵&#xff0c;重点介绍了这两种装置的性能特点&#xff0c;并对这两种压力控制装置进行了简要的分析对比。分析结论是压力泵将逐渐替代注射泵的应用&#…

Addressable CRC设置详解

设置 Asset Bundle的CRC设置中有三个选项&#xff1a; Disable&#xff1b; Enable,InClude Cached; Enable,Excludeing Cached; 修改后实际改的是这里的选项&#xff1a; Disable 设置为Disable&#xff0c;实际上是将BundledAssetGroupSchema类的UseAssetBundleCrc参数设…

软考:软件工程:软件设计,总体设计,详细设计,耦合内聚流程图,NS图,PAD图,判定树判定图。

软考&#xff1a;软件工程: 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准备的 &#xff08;1&#…

【吃透网络安全】2023软考网络管理员考点网络安全(二)网络攻击详解

涉及知识点 黑客的攻击手段介绍&#xff0c;常见的网络攻击&#xff0c;软考网络管理员常考知识点&#xff0c;软考网络管理员网络安全&#xff0c;网络管理员考点汇总。 后面还有更多续篇希望大家能给个赞哈&#xff0c;这边提供个快捷入口&#xff01; 第一节网络管理员考…

多版本管理node.js

多版本管理node.js 1. 安装2. 配置使用2.1 修改node源2.2 常用命令 在Windows 计算机上管理node.js的多个安装版本。 这是朋友推荐的&#xff0c;就是自己在升级node的时候给搞崩了&#xff0c; 不得不提升效率&#xff0c;于是发现了这个好工具&#xff0c;可以反过来理解&…

金蝶云星空RCE漏洞复现

0x01 产品简介 金蝶云星空是一款云端企业资源管理&#xff08;ERP&#xff09;软件&#xff0c;为企业提供财务管理、供应链管理以及业务流程管理等一体化解决方案。金蝶云星空聚焦多组织&#xff0c;多利润中心的大中型企业&#xff0c;以 “开放、标准、社交”三大特性为数字…

端午安康,节日送祝福

端午节是在中国农历的五月份&#xff0c;今年是&#xff08;公历&#xff09;&#xff16;月22日&#xff0c;它是中国最古老的节日 之一&#xff0c;已经有两千多年的历史。The Duanwu or Dragon Boat Festival, is generally celebrated on the fifth month of the Chinese l…

(自定义包导包失败一万次的经验)ModuleNotFoundError: No module named ‘xxx‘

导自定义的包遇到的问题 首先我列一下我的目录结构 |-src|-__init__.py|-Dao|-insertData.py|-pojo|-DataBaseDao|-everyData我现在在我的insertData.py通过from src.pojo import DataBaseDao,everyData导包,在pycharm中可以正常运行,但是在控制台运行失败,提示 ModuleNotFo…

移植微雪例程-2.6寸双色墨水屏到ESP32IDF中

微雪官网&#xff1a;E-Paper ESP32 Driver Board - Waveshare Wiki 去微雪官网&#xff0c;把墨水屏的例程下下来。 我这个使用的是2.6寸的双色墨水屏。 下载下来之后&#xff0c;然后移植到IDF上面。 移植epd2in13bc-demo这个例程。将EPD_2in13bc.h这个头文件中的宽&#…

2022年广西壮族自治区第二届职业技能大赛“网络安全项目”比赛任务书

2022年广西壮族自治区第二届职业技能大赛 “网络安全项目”比赛任务书 一、竞赛时间 总计&#xff1a;12小时 竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A A-1 登录安全加固 240分钟 350分 A-2 本地安全策略配置 A-3 流量完整性保护 A-4 事件监控…

算法程序设计 之 背包问题(5/8)

一、实验目的&#xff1a; 理解并掌握利用-动态规划算法与贪心算法求解不同背包问题。 实验内容 0-1背包问题给定n种物品和一个背包。物品i的重量是w[i]&#xff0c;其价值为vi&#xff0c;背包的容量为C。问如何选择装入背包的物品&#xff0c;使得装入背包中物品的总价值最大…

CDH集群离线部署-6.3.1

1. 文件下载 CDH 安装使用到的相关文件&#xff1a;链接: https://pan.baidu.com/s/1xDQD1Sa8s47Qiu_EFYdhUA?pwd5mrt 提取码: 5mrt 2. 机器基础调整 所有机器都需要执行下面所有的步骤。 2.1. 准备机器 至少三台2核16G机器。 2.1.1. ECS 服务器 如果买的是 ECS 服务器&…

Apikit 自学日记:邀请成员一起协作

邀请成员一起协作 在 Apikit 中&#xff0c;所有的 API 接口都是以项目的方式进行管理&#xff0c;因此首先需要创建一个 API 管理项目。 除了创建 API 项目&#xff0c;还可以创建项目组来对项目进行分类或者设置统一的用户操作权限。 一、手动创建项目 进入 API 研发管理项…

有趣的笔试题——贪吃蛇游戏(确定不进来看看?)

原题 贪吃蛇游戏是一款耳熟能详的小游戏&#xff0c;通过上下左右控制蛇的方向&#xff0c;寻找吃的果子&#xff0c;每吃一口果子&#xff0c;蛇的身子会越吃越长&#xff0c;身子越长玩的难度就越大&#xff0c;不能碰墙&#xff0c;不能咬到自己的身体&#xff0c;更不能咬自…

关于springboot中前端传递多个数组以及其他参数及json在前后端转变方法技巧的记录

文末含一种json的动态解析方法,感兴趣的大佬记得看到最后评论交流 * 因公司GIS保密要求仅放部分代码在这儿 前端传输json的方法 json格式不再赘述 常规json传递 使用formData封装传递 好处就是当页面同时传递多个数组甚至是同时传递数组和参数到后台&#xff0c;可以自定义…

2023计算机组成原理复习【1-4】

第一章 计算机系统概述 1&#xff0e;计算机语言的分类&#xff1a;高级语言&#xff0c;低级语言&#xff08;汇编语言与机器语言&#xff09;。P8 高级语言是人类可读写的编程语言。低级语言包括汇编语言和机器语言两种。汇编语言是一种直接使用符号代替计算机指令的语言&a…

C++静态对象的移动问题

7.1返回普通的静态对象 MyString func(const char* p) {static MyString tmp(p);return tmp; } int main() {MyString s1("hello");s1func("helloworld");s1.Print();MyString s2;s2func("hello");s2.Print();return 0; } 结果&#xff1a; 进…