RabbitMQ在SpringBoot中的高级应用(2)

news2024/11/15 10:27:53

过期时间

        1.单独的设置队列的存活时间,队列中的所有消息的过期时间一样

    @Bean//创建交换机
    public DirectExchange ttlQueueExchange(){
        // 交换机名称      是否持久化        是否自动删除
        return new DirectExchange("ttl_queue_log",true,false);
    }

    @Bean//创建队列
    public Queue ttlQueue(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",30000);//表示当前队列的存活时间
        map.put("x-max-length",1000);//最大容量
        map.put("x-min-length",0);//最小容量
        //1.队列名称 2.是否持久化 3.是否唯一绑定某一个交换机  4.是否自动删除 5.给当前的队列配置初始化参数(存活时间,最大容量,最小容量)
        return new Queue("tt_queue",true,false,false,map);
    }

    //将队列和交换机进行绑定
    @Bean
    public Binding bingCreate(){                                                        //绑定的路由键
        return BindingBuilder.bind(ttlQueue()).to(ttlQueueExchange()).with("ttl_queue_key");
    }
   @Test//队列的过期时间
    public void ttlQueue(){
        re.convertAndSend("ttl_queue_log","ttl_queue_key","这是过期的消息");
    }

        消息队列是在接收消息的30s才会过期,当然,这个时间也可以在创建队列的时候更改

            在队列没有失效之前可以看到队列里的消息,过期后查看就是空数据

 

 2. 对每一条消息单独设置过期时间

    @Bean
    public DirectExchange ttlMessageExchange(){
          return new DirectExchange("ttl_message_exchange",true,false);
    }
    @Bean
    public Queue ttlMessage(){
         return new Queue("ttl_message",true,false,false,null);
    }
    @Bean
    public Binding bindingTtl(){
          return BindingBuilder.bind(ttlMessage()).to(ttlMessageExchange()).with("mk");
    }
        注意到这个包:import org.springframework.amqp.core.*;

                在发送消息的时候设置消息的过期时间

    @Test//消息的过期时间   消息
    public void ttlQueueMessage(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("30000");//设置存活时间 ms
        Message message = new Message("你好,这是消息的过期时间".getBytes(),messageProperties);
        re.convertAndSend("ttl_message_exchange","mk",message);
    }

         在设置过期时间内我们可以在ttl_message队列中查看到消息,等超过该时间也会查询不到消息

 

死信队列

        死信队列和普通的队列一样,只是用来独特的信息,比如:被拒绝接收的消息,未被处理的过期消息,超过最大的存储的消息

        1.编写核心配置文件application.properties

#开启手动签收(手动ACK)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 设置消息被拒绝后,不在重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=false
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual

             2.创建交换机和队列并绑定

    @Bean//创建一个死信交换机
    public DirectExchange dealExchange(){
          return new DirectExchange("dead_letter_exchange",true,false,null);
    }
    @Bean//创建死信队列
    public Queue dealQueue(){
          return new Queue("dead_letter",true,false,false,null);
    }
    @Bean//绑定死信队列和死信消息
    public Binding bindDead(){
          return BindingBuilder.bind(dealQueue()).to(dealExchange()).with("dead_log");
    }

    @Bean//创建业务层交换机
    public DirectExchange businessExchange(){
          return new DirectExchange("business_exchange",true,false,null);
    }

    @Bean//普通的队列
    public Queue testQueue(){
          HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","dead_letter_exchange");//配置死信交换机
        map.put("x-dead-letter-routing-key","dead_log");//设置死信交换机和绑定队列之间的路由键
        return new Queue("test_queue",true,false,false,map);
    }
    @Bean//绑定业务处理机和普通队列
    public Binding testExchange(){
          return BindingBuilder.bind(testQueue()).to(businessExchange()).with("b_refuse");
    }

    @Bean//创建过期消息队列
    public Queue testttlQueue(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","dead_letter_exchange");
        map.put("x-dead-letter-routing-key","dead_log");
        map.put("x-message-ttl",30000);
          return new Queue("test_ttl",true,false,false,map );
    }
    @Bean//绑定业务处理机和过期消息队列
    public Binding testttl(){
          return BindingBuilder.bind(testttlQueue()).to(businessExchange()).with("d_ttl");
    }
    @Bean//溢出队列
    public Queue testMaxQueue(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","dead_letter_exchange");
        map.put("x-dead-letter-routing-key","dead_log");
        map.put("x-max-length",3);
        return new Queue("test_max",true,false,false,map );
    }
    @Bean//绑定业务处理机和溢出队列
    public Binding testMax(){
          return BindingBuilder.bind(testMaxQueue()).to(businessExchange()).with("t_max");
    }

           3.发送消息

    @Test//死信队列的消息  拒收
    public void refuseMessage(){
        re.convertAndSend("business_exchange","b_refuse","死信队列:消息被拒收");
    }
    @Test//死信队列的消息   过期
    public void refuseMessage1(){
        re.convertAndSend("business_exchange","d_ttl","死信队列:过期的消息");
    }
    @Test//死信队列的消息   溢出
    public void refuseMessage2(){
        for (int i = 1; i < 6; i++) {
            re.convertAndSend("business_exchange","t_max","死信队列:溢出的消息"+i);
        }
    }

         4.处理消息

    @RabbitListener(queues = "test_queue")
    public void refuseConsumer(String msg,Message messagem,Channel channel) throws IOException {
        channel.basicNack(messagem.getMessageProperties().getDeliveryTag(),false,false);
        System.out.println("消息被拒收:"+msg);
    }

             dead_letter死信队列,test_queue拒收队列 , test_ttl过期队列,test_max最大队列

          1拒收消息,我们将消息发送到 test_queue队列,但是消息被拒收,消息就会出现在死信队列中,所以我们死信队列中的信息为1,而拒绝队列为0

         2.过期消息,在消息没有过期的时候消息在test_ttl队列中,等到有效期结束后就会进入到死信队列

 

         3.消息溢出

 

延迟队列

        在队列中的消息是不需要立即消费的,需要等待一段时间时候才会取出消费,通过死信队列进行中转

        1.创建队列并绑定死信队列

    @Bean//创建延迟消息队列
    public Queue testDelayQueue(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","dead_letter_exchange");
        map.put("x-dead-letter-routing-key","dead_log");
        map.put("x-message-ttl",30000);
        return new Queue("test_deal",true,false,false,map );
    }
    @Bean//绑定业务处理机和过期延迟队列
    public Binding testdel(){
          return BindingBuilder.bind(testDelayQueue()).to(businessExchange()).with("d_deal");
    }

        2.发送消息

    @Test//延迟消息
    public void dealMessage(){
          re.convertAndSend("business_exchange","d_deal","延迟的消息:生于小满,小满为安" );
    }

        3.处理消息

    @RabbitListener(queues = "dead_letter")//监听死信队列
    public void deadLetter(Message message,Channel channel){
        byte[] body = message.getBody();

        System.out.println("延迟消息:" +new String(body));
    }

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735959.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Swagger3学习笔记

参考https://blog.csdn.net/YXXXYX/article/details/124952856 https://blog.csdn.net/m0_53157173/article/details/119454044 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifact…

云之道知识付费V2小程序V3.1.1独立平台版安装使用教程

据播播资源了解&#xff0c;云之道知识付费小程序是一款专注于知识付费的小程序源码&#xff0c;为内容创业者、自媒体和教育培训机构提供全方位的互联网解决方案。 由播播资源小编全套安装云之道知识付费V2独立版系统&#xff0c;系统支持无限多开&#xff0c;相比上几版出现…

如何创建电子商务平台

创建一个电商平台已经变得简单而又高效&#xff0c;其中乔拓云作为一家专业的第三方平台&#xff0c;提供了丰富的模板和工具&#xff0c;帮助商家快速搭建自己的电商平台。本文将介绍如何通过乔拓云平台创建电商平台&#xff0c;并提供一些建议和注意事项。 首先&#xff0c;商…

wincc项目中VBS脚本密码的研究

文章目录 前言一、分析二、验证及使用 前言 很多时候我们在wincc中写全局脚本时会为自己的脚本添加密码&#xff0c;但很久很久以后再想修改密码忘记了怎么办呢。 一、分析 经过分析v7.0,v7.3,v7.5密码稍有不同&#xff0c;但同样最多可以设置21位的密码。 二、验证及使用

支持向量机(SVM)

目录 1 引言 2 支持向量机的理论基础 1. 什么是支持向量&#xff0c;它们在模型中的作用是什么&#xff1f; 2. 线性支持向量机的数学原理 3. 解释如何通过核技巧来处理非线性问题 4. 支持向量机的优点和局限 3 支持向量机的实践 1. 如何使用Python的sklearn库创建和训练…

【计算机组成与体系结构Ⅰ】实验4 存储器原理实验

一、实验目的 1&#xff1a;了解双端口静态存储器IDT7132的工作特性及使用方法。 2&#xff1a;了解半导体存储器如何读写。 二、实验总结 0&#xff1a;实验内容 双端口存储器RAM&#xff1a; 左端口的数据部分与数据总线DBUS7-DBUS0相联&#xff1b;右端口的数据引脚与指…

【云原生】在DACS沙箱内配置Telepresence工具(社区版限制5人连接)

使用 Telepresence 映射 Kubernetes 服务到 DACS 沙箱内 Telepresence 属于沙箱工具,它可以把我们在AWS云平台的EKS(Kubernetes)上部署的服务映射到本地,这样大家就可以在本地访问EKS(Kubernetes)上的资源了,如注册中心Nacos、Redis、Kafka等服务 安装 Telepresence 安装…

使用最小二乘进行多项式曲线拟合

目录 写在前面曲线拟合方法pcl实现的b样条曲线拟合最小二乘曲线拟合原理代码注&#xff1a;结果 参考完 写在前面 1、本文内容 使用Eigen进行最小二乘拟合曲线 2、平台/环境 Eigen(open3d), cmake, pcl 3、转载请注明出处&#xff1a; https://blog.csdn.net/qq_41102371/ar…

【Git】Github 上传文件到远程仓库时,经常发生网络错误,一个比较稳定的连接方法及我的示例

文章目录 一、问题导读二、完整的一个流程2.1 初始化2.2 从远程仓库拉取最新的更改并合并到当前分支2.3 远程仓库的 SSH URL2.4 添加到暂存区2.5 提交操作2.6 将一个远程仓库添加为 Git 仓库的远程别名2.7 推送到远程仓库2.8 最后的结果 三、HTTP和SSH的理解3.1 两者的区别3.1.…

【网络系统集成】网络认证实验

1.实验名称 网络认证实验 2.实验目的 学习网络认证配置 3.实验内容 3.1拓扑结构图 3.2地址分配 <

OpenCV实战(28)——光流估计

OpenCV实战&#xff08;28&#xff09;——光流估计 0. 前言1. 光流估计原理2. 光流算法实现3. 完整代码小结系列链接 0. 前言 当相机进行拍摄时&#xff0c;拍摄到的亮度图案会投射到图像传感器上&#xff0c;从而形成图像。在视频序列中&#xff0c;我们通常需要捕捉运动模式…

HCIP--OSPF实验1

1、合理规划IP地址&#xff0c;启用ospf单区域 2、R1-R2之间启用ppp的单向认证 3、R2-R3之间启用ppp的chap认证 4、R3-R5-F6之间使用MGRE&#xff0c;R3为hub端&#xff0c;R5,R6为spoke端&#xff1b; 要求MGRE接口网络型为BMA&#xff0c;spoke之间通信必须经过hub端 5、全…

Linux--进程

什么叫做进程&#xff1f; 程序加载到内存就叫进程&#xff08;看不懂是吧&#xff0c;看下面更详细一些&#xff09; 进程对应的代码和数据进程对应的PCB结构体

MySQL索引原理和优化

目录 1 什么是索引&#xff1f;1.1 引言1.2 索引原理1.3 索引分类1.3.1 主键索引1.3.2 普通索引&#xff08;单列索引&#xff09;1.3.3 复合索引&#xff08;组合索引&#xff09;1.3.4 唯一索引1.3.5 全文索引1.3.6 索引的查询和删除 1.4 索引的优缺点 2 索引数据结构2.1 Has…

做题遇见的PHP函数汇总

mb_substr函数 mb_substr() 函数返回字符串的一部分&#xff0c;之前学过 substr() 函数&#xff0c;它只针对英文字符&#xff0c;如果要分割的中文文字则需要使用 mb_substr() 语法&#xff1a; mb_substr ( $str ,$start [, $length NULL [, $encoding mb_encoding() ]] …

改进版简化路径。

美图 在原有的基础上增加对 cd - 的处理。 在 Unix 命令中&#xff0c;cd - 表示返回上一次所在的目录。我们可以使用一个变量来记录上一次所在的目录&#xff0c;在遇到 cd - 时将当前目录设置为上一次所在的目录。 以下是增加对 cd - 的处理后的代码&#xff1a; 4 cd /…

016 - STM32学习笔记 - SPI读写FLASH(一)

016 - STM32学习笔记 - SPI访问Flash&#xff08;一&#xff09; 之前csdn的名称是宥小稚&#xff0c;后来改成放学校门口见了&#xff0c;所以前面内容看到图片水印不要在意&#xff0c;都是自己学习过程中整理的&#xff0c;不涉及版权啥的。 1、什么是SPI&#xff1f; SP…

Linux项目自动化构建工具-make/Makefile以及git三板斧

目录 一、关于make/makefile的背景知识二、依赖关系和依赖方法三、make/makefile如何书写&#xff1f;四、文件的三个时间(Access、Modify、Change)五、Linux下倒计时和进度条代码的书写5.1 回车换行5.2 缓冲区5.3 倒计时代码实现5.4 进度条代码实现 六、git三板斧6.1 什么是gi…

10.15资源加载

定义&#xff1a; 1.直接属性引用 生成一个actor和声音&#xff1a; 运行时就会产生一个Myactor中设置的Actor并且播放Myactor中设置的声音。 音频&#xff1a;class USoundCue&#xff1b; 纹理&#xff1a;class UTexture&#xff1b; 材质&#xff1a; class UMaterial 模…

TCP/IP出现的背景及其历史【图解TCP/IP(笔记八)】

文章目录 TCP/IP出现的背景及其历史从军用技术的应用谈起ARPANET的诞生TCP/IP的诞生UNIX系统的普及与互联网的扩张商用互联网服务的启蒙 TCP/IP出现的背景及其历史 从军用技术的应用谈起 20世纪60年代&#xff0c;很多大学和研究机构都开始着力于新的通信技术。其中有一家以美…