Rabbitmq延迟消息

news2024/12/24 21:26:37

目录

  • 一、延迟消息
    • 1.基于死信实现延迟消息
      • 1.1 消息的TTL(Time To Live)
      • 1.2 死信交换机 Dead Letter Exchanges
      • 1.3 代码实现
    • 2.基于延迟插件实现延迟消息
      • 2.1 插件安装
      • 2.2 代码实现
    • 3.基于延迟插件封装消息

一、延迟消息

延迟消息有两种实现方案:
1,基于死信队列
2,集成延迟插件

1.基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:
消息的TTL(存活时间)和死信交换机Exchange,通过这两者的组合来实现延迟队列

1.1 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。

1.2 死信交换机 Dead Letter Exchanges

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
在这里插入图片描述
我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列

1.3 代码实现

在service-mq 中添加配置类

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterMqConfig {
    // 声明一些变量

       public static final String exchange_dead = "exchange.dead";
    public static final String routing_dead_1 = "routing.dead.1";
    public static final String routing_dead_2 = "routing.dead.2";
    public static final String queue_dead_1 = "queue.dead.1";
    public static final String queue_dead_2 = "queue.dead.2";

    // 定义交换机
    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(exchange_dead,true,false,null);
    }

    @Bean
    public Queue queue1(){
        // 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!
        HashMap<String, Object> map = new HashMap<>();
        // 参数绑定 此处的key 固定值,不能随意写
        map.put("x-dead-letter-exchange",exchange_dead);
        map.put("x-dead-letter-routing-key",routing_dead_2);
        // 设置延迟时间
        map.put("x-message-ttl ", 10 * 1000);
        // 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数
        return new Queue(queue_dead_1,true,false,false,map);
    }

    @Bean
    public Binding binding(){
        // 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上
        return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);
    }

    // 这个队列二就是一个普通队列
    @Bean
    public Queue queue2(){
        return new Queue(queue_dead_2,true,false,false,null);
    }

    // 设置队列二的绑定规则
    @Bean
    public Binding binding2(){
        // 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!
        return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);
    }
}

配置发送消息

@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Autowired
   private RabbitService rabbitService;

 @GetMapping("sendDeadLettle")
   public Result sendDeadLettle() {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok");
      System.out.println(sdf.format(new Date()) + " Delay sent.");
      return Result.ok();
   }
}

消息接收方

@Component
public class DeadLetterReceiver {


    @RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)
    public void getMessage(String msg, Message message, Channel channel) throws IOException {
        //时间格式化
        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");

        System.out.println("消息接收的时间:\t"+simpleDateFormat.format(new Date()));

        System.out.println("消息的内容"+msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

    }
}

在这里插入图片描述

2.基于延迟插件实现延迟消息

2.1 插件安装

Rabbitmq实现了一个插件x-delay-message来实现延时队列

  1. 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
  2. 切换到插件所在目录,执行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下
  3. 执行 docker exec -it rabbitmq /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录
  4. 执行 ls -l|grep delay 命令查看插件是否copy成功
  5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
  6. exit命令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 命令重启RabbitMQ容器

2.2 代码实现

配置队列

@Configuration
public class DelayedMqConfig {

    public static final String exchange_delay = "exchange.delay";
    public static final String routing_delay = "routing.delay";
    public static final String queue_delay_1 = "queue.delay.1";

     @Bean
    public Queue delayQeue1() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(queue_delay_1, true);
    }

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBbinding1() {
        return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();
    }
}

发送消息

@GetMapping("sendelay")
public Result sendDelay() {
   SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
   this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
         message.getMessageProperties().setDelay(10 * 1000);
         System.out.println(sdf.format(new Date()) + " Delay sent.");
         return message;
      }
   });
   return Result.ok();
}

接收消息

@Component
public class DelayReceiver {

    @RabbitListener(queues = DelayedMqConfig.queue_delay_1)
    public void get(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);
    }

}

3.基于延迟插件封装消息

/**
 * 封装发送延迟消息方法
 * @param exchange
 * @param routingKey
 * @param msg
 * @param delayTime
 * @return
 */
public Boolean sendDelayMsg(String exchange,String routingKey, Object msg, int delayTime){
    //  将发送的消息 赋值到 自定义的实体类
    GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
    //  声明一个correlationId的变量
    String correlationId = UUID.randomUUID().toString().replaceAll("-","");
    gmallCorrelationData.setId(correlationId);
    gmallCorrelationData.setExchange(exchange);
    gmallCorrelationData.setRoutingKey(routingKey);
    gmallCorrelationData.setMessage(msg);
    gmallCorrelationData.setDelayTime(delayTime);
    gmallCorrelationData.setDelay(true);

    //  将数据存到缓存
    this.redisTemplate.opsForValue().set(correlationId,JSON.toJSONString(gmallCorrelationData),10,TimeUnit.MINUTES);

    //  发送消息
    this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,message -> {
        //  设置延迟时间
        message.getMessageProperties().setDelay(delayTime*1000);
        return message;
    },gmallCorrelationData);

    //  默认返回
    return true;
}

修改retrySendMsg方法 – 添加判断是否属于延迟消息

//  判断是否属于延迟消息
if (gmallCorrelationData.isDelay()){
    //  属于延迟消息
    this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),message -> {
        //  设置延迟时间
        message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime()*1000);
        return message;
    },gmallCorrelationData);
}else {
    //  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法
    this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
}

利用封装好的工具类 测试发送延迟消息

//  基于延迟插件的延迟消息
@GetMapping("sendDelay")
public Result sendDelay(){
    //  声明一个时间对象
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("发送时间:"+simpleDateFormat.format(new Date()));
    this.rabbitService.sendDelayMsg(DelayedMqConfig.exchange_delay,DelayedMqConfig.routing_delay,"iuok",3);
    return Result.ok();
}

重试了4次,所以我们需要保证幂等性
在这里插入图片描述
结果会 回发送三次,也被消费三次!
如何保证消息幂等性?
1.使用数据方式
2.使用redis setnx 命令解决 — 推荐

@SneakyThrows
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void getMsg2(String msg,Message message,Channel channel){

    //  使用setnx 命令来解决 msgKey = delay:iuok
    String msgKey = "delay:"+msg;
    Boolean result = this.redisTemplate.opsForValue().setIfAbsent(msgKey, "0", 10, TimeUnit.MINUTES);
    //  result = true : 说明执行成功,redis 里面没有这个key ,第一次创建, 第一次消费。
    //  result = false : 说明执行失败,redis 里面有这个key
    //  不能: 那么就表示这个消息只能被消费一次!  那么第一次消费成功或失败,我们确定不了!  --- 只能被消费一次!
        //        if (result){
        //            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //            System.out.println("接收时间:"+simpleDateFormat.format(new Date()));
        //            System.out.println("接收的消息:"+msg);
        //            //  手动确认消息
        //            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        //        } else {
        //          //    不能消费!
        //        }
    //  能: 保证消息被消费成功    第二次消费,可以进来,但是要判断上一个消费者,是否将消息消费了。如果消费了,则直接返回,如果没有消费成功,我消费。
    //  在设置key 的时候给了一个默认值 0 ,如果消费成功,则将key的值 改为1
    if (!result){
        //  获取缓存key对应的数据
        String status = (String) this.redisTemplate.opsForValue().get(msgKey);
        if ("1".equals(status)){
            //  手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return;
        } else {
            //  说明第一个消费者没有消费成功,所以消费并确认
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println("接收时间:"+simpleDateFormat.format(new Date()));
            System.out.println("接收的消息:"+msg);
            //  修改redis 中的数据
            this.redisTemplate.opsForValue().set(msgKey,"1");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return;
        }
    }
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("接收时间:"+simpleDateFormat.format(new Date()));
    System.out.println("接收的消息:"+msg);

    //  修改redis 中的数据
    this.redisTemplate.opsForValue().set(msgKey,"1");
    //  手动确认消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/878954.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PyQt5下拉列表实现及信号与槽的连接

目录 1、常用方法 2、常用信号 3、实操 1、常用方法 QComboBox() 创建一个下拉框对象addItems 可以使用列表进行多个下拉框内容添加, 单个添加用addItemcurrentIndexChanged 是用来获取当前选择下拉框的索引, 这也是这个"信号"槽函数需要 有个索引传参, 这样就便…

优化查询性能:UNION与UNION ALL的区别

作用 在SQL查询中&#xff0c;当我们需要合并多个查询结果集时&#xff0c;我们通常会使用UNION和UNION ALL操作符&#xff0c;同时&#xff0c;如果你写的or语句不走索引&#xff0c;可以考虑使用UNION、UNION ALL优化。 在本篇博客中&#xff0c;我们将探讨UNION和UNION AL…

Lnux系统usb摄像头测试程序(一)

这是linux或国产系统上&#xff08;UOS、麒麟&#xff09;USB摄像头的测试程序&#xff0c;主要功能有 1、系统上USB摄像头的配置查询&#xff0c;包括摄像头支持的协议&#xff0c;支持的分辨率等信息 、视频预览&#xff08;支持yuv422和RGB两种格式 3、录像录音 4、视频编码…

值传递、形参实参的关系、地址传递(指针和函数)

1 值传递 实现两个数字进行交换&#xff0c;代码和运行结果如下所示&#xff1a; #include<iostream> using namespace std;void change(int a, int b) {int temp a;a b;b temp;cout << "change a " << a << endl;cout << "…

raw socket是啥(一)?

对于网络通讯&#xff0c;耳熟能详的莫过于TCP、UDP&#xff0c;二者皆需要ip和port。对于一般开发人员&#xff0c;找到一个“能用”的库就可以了&#xff0c;因为流式通讯&#xff0c;会有粘包问题&#xff0c;那就需要再加一个库&#xff0c;解决粘包问题&#xff0c;这样一…

【量化课程】02_4.数理统计的基本概念

2.4_数理统计的基本概念 数理统计思维导图 更多详细内容见notebook 1.基本概念 总体&#xff1a;研究对象的全体&#xff0c;它是一个随机变量&#xff0c;用 X X X表示。 个体&#xff1a;组成总体的每个基本元素。 简单随机样本&#xff1a;来自总体 X X X的 n n n个相互…

大数据平台中元数据库—MySQL的异常故障解决

本文的主要目标是解决大数据平台中元数据库MySQL的异常故障。通过分析应用响应缓慢的问题&#xff0c;找到了集群组件HIVE和元数据库MySQL的原因。通过日志分析、工具检测和专家指导等一系列方法&#xff0c; 最终确定问题的根源是大数据集群中租户的不规范使用所导致&#xff…

冠达管理:机构密集调研医药生物股 反腐政策影响受关注

进入8月&#xff0c;跟着反腐事件发酵&#xff0c;医药生物板块呈现震荡。与此一起&#xff0c;组织出资者对该板块上市公司也展开了密集调研。 到昨日&#xff0c;8月以来就有包含南微医学、百济神州、维力医疗、方盛制药等12家医药生物板块的上市公司接受组织调研&#xff0c…

【0810作业】Linux中基于UDP的TFTP文件传输(下载、上传)

一、tftp协议概述 简单文件传输协议&#xff0c;适用于在网络上进行文件传输的一套标准协议&#xff0c;使用UDP传输。 特点&#xff1a; ① 是应用层协议 ② 基于UDP协议实现 ③ 数据传输模式 octet&#xff1a;二进制模式&#xff08;常用&#xff09;mail&#xff1a;已经不…

易服客工作室:PressMart – 现代Elementor WooCommerce WordPress商城主题

PressMart是现代且独特的 Elementor WooCommerce WordPress商城主题。它配备了高品质的 05 预建主页&#xff0c;适合任何在线商店&#xff0c;如时装店、电子产品商店、家具店等。 我们使用 Elementor – 一个拖放页面构建器&#xff0c;不需要用户的编码技能即可轻松编辑和构…

图解WebSocket

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱写博客的嗯哼&#xff0c;爱好Java的小菜鸟 &#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44d;三连支持&#x1f44d;一下博主哦 &#x1f4dd;个人博客&#xff1a;敬请期待 文章目录 前言一、…

使用phpstorm开发调试thinkphp

1.环境准备 1.开发工具下载&#xff1a;PhpStorm: PHP IDE and Code Editor from JetBrains 2.PHP下载&#xff1a;PHP: Downloads 3. PHP扩展&#xff1a;PECL :: Package search 4.用与调试的xdebug模块&#xff1a; Xdebug: Downloads xdebug模块&#xff0c;如果是php8以…

分类预测 | MATLAB实现GWO-BiLSTM-Attention多输入分类预测

分类预测 | MATLAB实现GWO-BiLSTM-Attention多输入分类预测 目录 分类预测 | MATLAB实现GWO-BiLSTM-Attention多输入分类预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.GWO-BiLSTM-Attention 数据分类预测程序 2.代码说明&#xff1a;基于灰狼优化算法&#xff08…

【JVM】对String::intern()方法深入详解(JDK7及以上)

文章目录 1、什么是intern&#xff1f;2、经典例题解释例1例2例3 1、什么是intern&#xff1f; String::intern()是一个本地方法&#xff0c;它的作用是如果字符串常量池中已经包含一个等于此String对象的字符串&#xff0c;则返回代表池中这个字符串的String对象的引用&#…

Redis缓存读写策略(三种)数据结构(5+3)

Redis缓存读写策略&#xff08;三种&#xff09; Cache Aside Pattern&#xff08;旁路缓存模式&#xff09; Cache Aside Pattern 是我们平时使用比较多的一个缓存读写模式&#xff0c;比较适合读请求比较多的场景。 写&#xff1a; 先更新 db然后直接删除 cache 。 读 : …

爆肝整理,性能测试方法与关键指标以及瓶颈定位思路,一篇贯通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 性能测试方法 1、…

Mybatis 源码 ③ :SqlSession

一、前言 Mybatis 官网 以及 本系列文章地址&#xff1a; Mybatis 源码 ① &#xff1a;开篇Mybatis 源码 ② &#xff1a;流程分析Mybatis 源码 ③ &#xff1a;SqlSessionMybatis 源码 ④ &#xff1a;TypeHandlerMybatis 源码 ∞ &#xff1a;杂七杂八 在 Mybatis 源码 ②…

ubuntu 部署 ChatGLM-6B 完整流程 模型量化 Nvidia

ubuntu 部署 ChatGLM-6B 完整流程 模型量化 Nvidia 初环境与设备环境准备克隆模型代码部署 ChatGLM-6B完整代码 ChatGLM-6B 是一个开源的、支持中英双语的对话语言模型&#xff0c;基于 General Language Model (GLM) 架构&#xff0c;具有 62 亿参数。结合模型量化技术&#x…

Go学习-Day1

Go学习-Day1 个人博客&#xff1a;CSDN博客 打卡。 Go语言的核心开发团队&#xff1a; Ken Thompson (C语言&#xff0c;B语言&#xff0c;Unix的发明者&#xff0c;牛人)Rob Pike(UTF-8发明人)Robert Griesemer(协助HotSpot编译器&#xff0c;Js引擎V8) Go语言有静态语言的…

Sentinel使用实例

不说了&#xff0c;直接上官方文档 https://github.com/alibaba/spring-cloud-alibaba/blob/master/spring-cloud-alibaba-examples/sentinel-example/sentinel-core-example/readme-zh.md Sentinel Example 项目说明 本项目演示如何使用 Sentinel starter 完成 Spring Clo…