【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL

news2025/1/9 16:23:06

生产者端

目录结构

导入依赖

修改yml

业务逻辑

        队列消息过期

        消息单独过期


   

        TTL(Time To Live)存活时间。表示当消息由生产端存入MQ当中的存活时间,当时间到达的时候还未被消息就会被自动清除。RabbitMQ可以对消息单独设置过期时间也可以对整个队列(并不是队列,而是队列中的消息)设置过期时间。

生产者端

目录结构

导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

修改yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    #三个类型:none默认不开启确认回调 correlated开启确认回调
    #simple也会确认回调 还会调用waitForConfirms()方法或waitForConfirmsOrDie()方法
    publisher-confirm-type: correlated # 开启确认回调
    publisher-returns: true # 开启退回回调

业务逻辑

        队列消息过期

        第一段代码即是定义交换机与队列的名称并使其进行绑定,仅是一个配置类的效果。第二段代码就是生产者产生消息的方法,只需要在意for循环里面的逻辑即可。而图1即是创建出来的队列以及生产的10条消息,在10s会自动删除。因为在配置类中已经定义了TTL。

/**
 * 定义交换机与队列的Bean 并且使之绑定
 */
@Component
public class RabbitMQConfig {

    public static final String TTL_EXCHANGE_NAME = "ttl_exchange_name";
    public static final String TTL_QUEUE_NAME = "ttl_queue_name";

    @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.topicExchange(TTL_EXCHANGE_NAME).durable(true).build();
    }

    //配置队列的时候顺带上ttl()方法 其内部对MQ设置了参数"x-message-ttl"
    //注意这里的单位是毫秒 所以我写的参数为10000毫秒即10秒
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(TTL_QUEUE_NAME).ttl(10000).build();
    }

    @Bean
    public Binding ttl(@Qualifier("ttlExchange") Exchange exchange,
                       @Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}
@SpringBootTest
@RunWith(SpringRunner.class)
class RabbitmqProducerApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testTTL(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b) System.out.println("交换机成功接受到了消息");
                else System.out.println("消息失败原因" + s);
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("队列接受不到交换机的消息进行了失败回调");
            }
        });
        // 以上代码只是保证消息传递的可靠性 与TTL无关
        for(int i = 0; i < 10; ++i){
            rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,"test.heHe","HelloWorld");
        }
    }
}

                                                                 图1

        消息单独过期

         这里的配置类还是如上相同:队列中的消息10s自动过期,再对其中一条消息进行处理就可以更好的明白这两种过期的区别:其中一条消息设置了5s自动过期,通过图2可以发现队列中有11条消息,当5s后变为10条消息,再过了5s后就没有消息。

@SpringBootTest
@RunWith(SpringRunner.class)
class RabbitmqProducerApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testTTL(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b) System.out.println("交换机成功接受到了消息");
                else System.out.println("消息失败原因" + s);
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("队列接受不到交换机的消息进行了失败回调");
            }
        });
        // 以上代码只是保证消息传递的可靠性 与TTL无关
        // 消息的后处理对象 设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");//设置消息对象5s后过期
                return message;
            }
        };
        //消息单独5s过期
        rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,"test.heHe","HelloWorld",messagePostProcessor);
        //队列中的消息全体10s过期
        for(int i = 0; i < 10; ++i){
            rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,"test.heHe","HelloWorld");
        }
    }
}

                                                                 图2

        到这真的就以为结束了吗,当我把for循环的10条消息放到单独过期的消息上面,发现了新大陆:一开始的消息也是如图2所示为11条,但是但是,过了5s后并不会消除一条消息,而是过了10s后将11条全部删除了。于是我猜测这个消息的存放队列就好似一个栈,虽然先生产的消息的生存时间短,但是当别的消息压在自己头上的时候是出不去的,而是必须等自己为栈顶元素才可以出栈!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/482879.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

爬虫 - QS世界大学排名数据

爬虫 - QS世界大学排名数据 网站简介爬虫方法概述使用工具爬虫概述 第一部分导入需要用到的python包设置selenium控制浏览器打开网页控制鼠标操作定位节点 提取数据滚轮翻页构建循环自动爬取数据数据储存 第二部分导入需要用到的python包获取网页设置请求头读取链接获取网页信息…

TIM-定时器——STM32

TIM-定时器——STM32 TIM(Timer)定时器 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元&#xff0c;在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能&#xff0c;而且还包…

K8S第二讲 Kubernetes集群简易版搭建步骤

Kubernetes集群搭建步骤 1&#xff1a;准备物理或虚拟机器 为Kubernetes集群准备物理或虚拟机器。至少需要一个控制节点&#xff08;Master Node&#xff09;和一个工作节点&#xff08;Worker Node&#xff09;&#xff0c;建议使用Linux操作系统。 2&#xff1a; 安装Dock…

1987-2021年全国各省进出口总额数据含进口总额和出口总额

1987-2021年全国各省进出口总额数据含进口和出口 1、时间&#xff1a;1987-2021年 2、范围&#xff1a;包括全国30个省不含西藏 3、指标&#xff1a;进出口总额、进口总额、出口总额 4、单位&#xff1a;万美元 5、来源&#xff1a;各省NJ、JIN rong统计NJ 6、缺失情况说…

递归算法及经典例题详解

大部分人在学习编程时接触的第一个算法应该就是递归了&#xff0c;递归的思想其实很好理解&#xff0c;就是将一个问题拆分为若干个与本身相似的子问题&#xff0c;通过不断调用自身来求解。 但很多新手在实际操作中却很难正确使用到递归&#xff0c;有时面对问题还会有种无从…

win7下java环境搭建以及jdk环境变量配置

很多人在搭建页游、手游时候经常遇到JAVA闪退,基本都是环境变量或者路径错误导致的。本章节主要讲解在win7系统环境下,java环境变量配置方法,java环境配置正确,才可以对apk程序进行反编译运行页游手游。其他操作系统环境变量大同小异参考下就会了。 安装教程: 1、直接运…

让语言学习更简单的 WordFlow

作为一个英语并不是那么特别好的计算机专业学生&#xff0c;长期积累英语的学习对个人发展还是有意义的。简单来说&#xff0c;我在语言上最大的两个问题&#xff0c;一个自己「不理解」&#xff0c;另一个是自己「不会表达」。 上述两个问题主要体现在口语层面&#xff0c;而…

1997-2021年全国30省技术市场成交额(亿元)

1997-2021年全国30省技术市场成交额 1、时间&#xff1a;1997-2021年 2、范围&#xff1a;30省不含西藏 3、来源&#xff1a;国家统计J 4、指标&#xff1a;技术市场成交额 5、缺失情况说明&#xff1a;无缺失 6、指标解释及用途&#xff1a; 技术市场成交额是一个客观、…

YOLOv5 训练自己的数据集

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客 &#x1f356; 原作者&#xff1a;K同学啊|接辅导、项目定制 ● 难度&#xff1a;夯实基础⭐⭐ ● 语言&#xff1a;Python3、Pytorch3 ● 时间&#xff1a;5月1日-5月6日 &#x1f37a;要求&#xff1…

基于C#开发 B/S架构的实验室管理系统 云LIS系统(MVC + SQLserver + Redis)

一、云LIS系统是将各种样本、免疫、临检、放免、及实验用的分析仪器&#xff0c;通过网络管理和传输实验分析过程中全部数据。对每一专业&#xff0c;实现检验申请、样本采集、样本核收、联机检验、质量控制、报告审核到报告发布的全环节的信息化管理平台。 二、基于B/S架构的云…

【SpringMVC】| SpringMVC注解式开发

目录 一&#xff1a;SpringMVC注解式开发 1. RequestMapping定义请求规则 2. 五种数据提交的方式 3. 请求参数中文乱码解决 4. action方法的返回值 5. SpringMVC的四种跳转方式 6. SpringMVC支持的默认参数类型 7. 日期处理 8. 标签的使用 9. 资源在WEB-INF目录下 一…

常见三种编码方式

常见三种编码方式 1. one-hot 编码2. 虚拟编码3. 效果编码 最近复习一些书&#xff0c;记录一下。在特征工程中&#xff0c;数据集经常会出现分类变量&#xff0c;这时候的分类变量可能是字符型&#xff0c;通常不能直接用于训练模型&#xff0c;这时需要对分类变量进行编码&am…

JavaWeb《CSS》

本笔记学习于Acwing平台 目录 1. 样式定义方式 2.1 行内样式表&#xff08;inline style sheet&#xff09; 2.2 内部样式表&#xff08;internal style sheet&#xff09; 2.3 外部样式表&#xff08;external style sheet&#xff09; 2. 选择器 2.1 标签选择器 2.2 …

ChatGPT服务器配置部署-chatGPT国内入口搭建

chatGPT国内入口 ChatGPT是由OpenAI公司开发的一种自然语言生成模型&#xff0c;国内入口一般是通过API接口或者SDK对接实现的。具体的对接方式可以参考以下步骤&#xff1a; 了解ChatGPT的API接口或者SDK: 首先需要了解ChatGPT提供的API接口或者SDK&#xff0c;包括使用方式、…

文件上传漏洞靶场

目录 第一关 源码 前端 后端 代码审计 前端 后端 绕过原理 抓包后未修改 抓包后修改且文件上传成功 第二关 源码 后端 代码审计 绕过原理 抓包后未修改 抓包后修改且文件上传成功 ​编辑 第三关 源码 后端 代码审计 绕过原理 第四关 源码 后端 代码审…

linux以太网(二)

内核版本&#xff1a;linux-3.14.16 基于imx6 一、文件fec_main.c分析 路径&#xff1a;drivers\net\ethernet\freescale\fec_main.c 1、platform总线 标准的平台总线使用方式 设备树匹配 设备树节点 2、平台总线probe 1&#xff09;分配net_device相关结构 分配 与平…

单源最短路问题

全部代码 全部代码在github acwing 上 正在更新 https://github.com/stolendance/acwing 图论 欢迎大家star与fork 单源最短路问题 先用spfa算法 不行再换其他的 spfa-超级万能 说不定比dijsktra还快 dis[] 代表第k到某一点的最短距离 queue 代表刚被更新的点 它有可能更…

【Java校招面试】基础知识(三)——多线程与并发

目录 前言一、基础概念二、互斥锁三、Java内存模型&#xff08;JMM&#xff09;四、线程池后记 前言 本篇主要介绍Java多线程与并发相关内容。 “基础知识”是本专栏的第一个部分&#xff0c;本篇博文是第三篇博文&#xff0c;如有需要&#xff0c;可&#xff1a; 点击这里&a…

每日一题——反转字符串—II

每日一题 反转字符串——II 题目链接 思路 我们先来举几个例子来理解题目意思 字符串“ abcdefgh ”&#xff0c;k 2&#xff0c;那么依据题目意思&#xff0c;反转后的字符串应该是“ bacdfegh ”&#xff08;即每2k个字符&#xff0c;就反转前k个字符&#xff0c;且无剩余…

基于 Python+Flask+SQLite 的网易云音乐评论情感分析系统

基于 PythonFlaskMySQL 的网易云音乐评论情感分析系统&#xff0c;采用Echart构建图表&#xff0c;支持一键切换颜色主题&#xff0c;通过连接数据库获取评论数据。对失效的爬虫代码进行了更新&#xff0c;可通过歌曲id_半_自动获取评论&#xff0c;具体可以看下方的过程展示。…