RabbitMQ高级特性2 、TTL、死信队列和延迟队列

news2024/11/23 20:51:25

MQ高级特性

1.削峰

设置 消费者

测试 添加多条消息

拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费

TTL

Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以在管理台新建队列、交换机,绑定

1.图形化操作

添加队列

添加交换机

将交换机和对应的队列进行绑定

时间结束 , 消息失效

2.代码实现

配置 生产者

@Configuration
public class TopicMqTtlConfig {


    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Value("${mq.queue.name1}")
    private String QUEUENAME1;
    @Value("${mq.queue.name2}")
    private String QUEUENAME2;
    // 1
    // . 交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    // 2。 队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue = QueueBuilder.nonDurable(QUEUENAME1)
                .withArgument("x-message-ttl",30000)//过期时间30秒
                .withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃
                .build();
        return queue;
    }
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2)
                .withArgument("x-message-ttl",300000000)//过期时间30秒
                .build();
        return queue2;
    }
    // 3. 交换机和队列进行绑定
    @Bean("binding1")
    public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs();
        return binding1;
    }
    @Bean("binding2")
    public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs();
        return binding2;
    }
}

测试

添加成功 ttl1只接收10条

时间过期

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机

消息在什么情况下会成为死信?(面试会问)

1.队列消息长度到最大的限制

最大的长度设置为10当第11条消息进来的时候就会成为死信

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)

设置消费者为手动签收的状态

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定交换机的方式是什么?

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

// 1.  交换机  :正常的交换机   死信交换机

// 2.队列  :正常的  死信

//3.绑定   正常ex - 正常的que

正常的que和死信交换机

死信ex-死信queue

2.代码实现

@Configuration
public class TopicMqDeadConfig {
    @Value("${mq1.exchange.name1}")
    private String EXCHANGENAME;
    @Value("${mq1.exchange.name2}")
    private String DEADEXCHANGE;
    @Value("${mq1.queue.name1}")
    private String QUEUENAME1;
    @Value("${mq1.queue.name2}")
    private String QUEUENAME2;
    // 声明正常交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    //  正常队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue = QueueBuilder.nonDurable(QUEUENAME1)
                .withArgument("x-message-ttl",30000)//过期时间30秒
                .withArgument("x-dead-letter-exchange",DEADEXCHANGE)
                .withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定
                //.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃
                .build();
        return queue;
    }
    // 交换机和队列进行绑定
    @Bean("binding1")
    public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();
        return binding1;
    }


    // 声明死信交换机
    @Bean("ex2")
    public Exchange getDeadExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();
        return exchange;
    }
    //死信队列
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2)
                .build();
        return queue2;
    }
    // 死信交换机和死信队列进行绑定
    @Bean("binding2")
    public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();
        return binding2;
    }


}

测试

如果程序出现错误 拒绝签收

监听正常队列

发送消息 启动测试

总结:

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:

        1. 队列消息长度到达限制;

        2. 消费者拒接消费消息,并且不重回队列;

        3. 原队列存在消息过期设置,消息到达超时时间未被消费;

 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  • 1. 下单后,30分钟未支付,取消订单,回滚库存
  • 2. 新用户注册成功7天后,发送短信问候。

实现方式:

1. 定时器

2. 死信队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

1.配置

添加依赖

  <!--2. rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>




        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <!--nacos 配置中心-->
        <!--配置中心-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>


        <!-- application  bootstrap  -->


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>


        <!-- nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>




        <dependency>
            <groupId>com.example</groupId>
            <artifactId>sys-comm</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

 

修改配置

2.代码实现

创建实体类

发送消息 测试

过期后放入死信队列

添加依赖

 <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.16</version>
        </dependency>

将json数据转化为对象

获取成功

3.连接数据库

创建表

创建测试类
@RestController
@RequestMapping("order")
public class OrderController {
    @Value("${mq1.exchange.name1}")
    private String EXCHANGENAME;
    //
    @Resource
    private RabbitTemplate rabbitTemplate;
    @GetMapping
    public Result aaa(TabOrder order){
     //1. 消息 存放到mq里面
        String s = JSONUtil.toJsonStr(order);
        // openfeign  --      数据添加到数据库里面
        rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s);
        return Result.success(s);
    }
}

监听normal
import javax.annotation.Resource;
@Component
public class XiaoFeng implements ChannelAwareMessageListener {
    @Resource
    private TabOrderMapper orderMapper;
    @Override
    @RabbitListener(queues = "test_queue_normal")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(2000);// 20s
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        // 将字符串转化为 对象


        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            TabOrder order = JSONUtil.toBean(s, TabOrder.class);
            // 将订单的信息 报讯到数据库里面
            int insert = orderMapper.insert(order);
            channel.basicAck(deliveryTag,true); //
        }catch(Exception e){
            //long deliveryTag, boolean multiple, boolean requeue
            System.out.println("拒绝签收消息");
            channel.basicNack(deliveryTag,true,false);// 死信消息
        }
    }
}

监听dead
@Component
public class YanChi implements ChannelAwareMessageListener {


    @Resource
    private TabOrderMapper orderMapper;
    @Override
    @RabbitListener(queues = "test_queue_dead")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(2000);// 20s
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        // 将字符串转化为 对象
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            TabOrder order = JSONUtil.toBean(s, TabOrder.class);
            //  order 的状态
            TabOrder tabOrder = orderMapper.selectById(order.getId());
            if(tabOrder.getStatus()==1){
                // 取消
                tabOrder.setStatus(3);
            }
            orderMapper.updateById(tabOrder);
            channel.basicAck(deliveryTag,true); //
        }catch(Exception e){
            //long deliveryTag, boolean multiple, boolean requeue
            System.out.println("拒绝签收消息");
            channel.basicNack(deliveryTag,true,false);// 死信消息
        }
    }
}

测试

成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1265859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浏览器触发下载Excel文件-Java实现

目录 1:引入maven 2:代码实现 3.导出通讯录信息到Excel文件 4.生成并下载Excel文件部分解释 1:引入maven 添加依赖:首先,在你的项目中添加EasyExcel库的依赖。你可以在项目的构建文件(如Maven的pom.xml)中添加以下依赖项:<dependency><groupId>com.alib…

医疗影像数据集—CT、X光、骨折、阿尔茨海默病MRI、肺部、肿瘤疾病等图像数据集

最近收集了一大波关于CT、X光等医疗方面的数据集包含骨折、阿尔茨海默病MRI、肺部疾病等类型的医疗影像数据&#xff0c;废话不多说&#xff0c;给大家逐一介绍&#xff01;&#xff01; 1、彩色预处理阿尔茨海默病MRI(磁共振成像)图像数据集 彩色预处理阿尔茨海默病MRI(磁共…

平凯星辰携手教育部教育管理信息中心,助力普惠教育数字化

近日&#xff0c;企业级开源分布式数据库厂商平凯星辰与教育部教育管理信息中心达成合作&#xff0c;TiDB 分布式数据库为全国中小学管理服务平台提供全栈服务。双方将携手深入探索领先的数据库技术在教育行业的新场景与新应用&#xff0c;既夯实教育数字化底座&#xff0c;助力…

“抓机遇,促发展”2024亚洲国际人工智能展览会(世亚智博会)

随着人工智能技术的飞速发展&#xff0c;我们正在见证一个全新的时代。2024年即将到来&#xff0c;这一年是人工智能创新将重塑传统界限的一年。从全球领先的科技大国到各类企业&#xff0c;人工智能技术正在以前所未有的速度融入我们的日常生活&#xff0c;推动行业走向未来&a…

34 - 记一次线上SQL死锁事故:如何避免死锁?

之前我参与过一个项目&#xff0c;在项目初期&#xff0c;我们是没有将读写表分离的&#xff0c;而是基于一个主库完成读写操作。在业务量逐渐增大的时候&#xff0c;我们偶尔会收到系统的异常报警信息&#xff0c;DBA 通知我们数据库出现了死锁异常。 按理说业务开始是比较简…

好用的json处理工具He3 JSON

官网地址&#xff1a;https://he3app.com/zh/ json格式化 https://portal.he3app.com/home/extension/json-to-pretty 其他 https://portal.he3app.com/home/category

笔记-PC端wireshark采集FPGA数据的操作

wireshark采集FPGA的数据 目录 一、准备工作二、操作步骤 一、准备工作 1、软件&#xff1a;wireshark 2、平台&#xff1a;PC&#xff08;本人是win11&#xff09;、带有以太网功能的zynq平台 3、网线: 用网线连接zynq板子和PC的以太口端口 二、操作步骤 1、打开任务管理器…

【算法】滑动窗口题单——1.定长滑动窗口⭐

文章目录 1456. 定长子串中元音的最大数目2269. 找到一个数字的 K 美丽值1984. 学生分数的最小差值&#xff08;排序&#xff09;643. 子数组最大平均数 I1343. 大小为 K 且平均值大于等于阈值的子数组数目2090. 半径为 k 的子数组平均值2379. 得到 K 个黑块的最少涂色次数1052…

【问题系列】消费者与MQ连接断开问题解决方案(一)

1. 问题描述 当使用RabbitMQ作为中间件&#xff0c;而消费者为服务时&#xff0c;可能会出现以下情况&#xff1a;在长时间没有消息传递后&#xff0c;消费者与RabbitMQ之间出现连接断开&#xff0c;导致无法处理新消息。解决这一问题的方法是重启Python消费者服务&#xff0c;…

okhttp系列-拦截器的执行顺序

1.将拦截器添加到ArrayList final class RealCall implements Call {Response getResponseWithInterceptorChain() throws IOException {//将Interceptor添加到ArrayListList<Interceptor> interceptors new ArrayList<>();interceptors.addAll(client.intercept…

Android控件全解手册 - 任意View缩放平移工具-源码

Unity3D特效百例案例项目实战源码Android-Unity实战问题汇总游戏脚本-辅助自动化Android控件全解手册再战Android系列Scratch编程案例软考全系列Unity3D学习专栏蓝桥系列ChatGPT和AIGC &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分…

数学建模-基于BL回归模型和决策树模型对早产危险因素的探究和预测

整体求解过程概述(摘要) 近年来&#xff0c;全球早产率总体呈上升趋势&#xff0c;在我国&#xff0c;早产儿以每年 20 万的数目逐年递增&#xff0c;目前早产已经成为重大的公共卫生问题之一。据研究,早产是威胁胎儿及新生儿健康的重要因素&#xff0c;可能会造成死亡或智力体…

每日一题:LeetCode-202.面试题 08.06. 汉诺塔问题

每日一题系列&#xff08;day 07&#xff09; 前言&#xff1a; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f50e…

20世纪30年代的大危机

背景 1929年9月&#xff0c;美国财政部部长安德鲁梅隆向公众保证“现在没有担心的理由&#xff0c;这一繁荣的高潮将会继续下去”。 当时流行的一首儿歌&#xff1a;“梅隆拉响汽笛&#xff0c;胡佛敲起钟&#xff0c;华尔街发出信号&#xff0c;美国往地狱里冲&#xff01;”…

水库大坝安全在线监测系统守护水利工程的坚实屏障

随着科技的发展&#xff0c;水库大坝的安全监测已经进入了一个全新的时代。过去&#xff0c;我们无法实时监测大坝的安全状况&#xff0c;只能在灾难发生后进行补救&#xff0c;现在&#xff0c;通过WX-DB1水库大坝安全在线监测系统&#xff0c;我们能够在第一时间掌握大坝的运…

【创建和排查隐藏进程和隐藏计划任务】

Window 创建隐藏进程和隐藏计划任务&#xff1a; 隐藏进程&#xff1a; 在Windows中&#xff0c;隐藏进程主要通过修改进程属性或使用第三方工具实现。以下是一个使用PowerShell脚本创建隐藏进程的示例&#xff1a; $Script {Start-Process -FilePath "notepad.exe"…

Kubernetes Pod 介绍

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容Pod 介绍与原理讲解Pod 生命周期管理Pod 的健康检查 &#x1f4e2;文章总结&#x1f4e5;博主目标 &#x1f50a;博主介绍 &#x1f31f;我是廖志伟&#xff0c;一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51…

Peter算法小课堂—高精度减法

给大家看个小视频高精度减法_哔哩哔哩_bilibili 基本思想 计算机模拟人类做竖式计算&#xff0c;从而得到正确答案 大家还记得小学时学的“减法竖式”吗&#xff1f;是不是这样 x-y问题 函数总览&#xff1a; 1.converts() 字符串转为高精度大数 2.le() 判断大小 3.sub() …

这个蓄电池监控神技,谁用谁知道!

随着电力需求的不断增长&#xff0c;蓄电池作为能量存储的关键组件在各个领域得到了广泛应用&#xff0c;为了确保蓄电池的可靠性和性能&#xff0c;监控系统变得至关重要。 蓄电池监控系统可以实时监测电池的状态、健康状况以及充放电过程&#xff0c;从而提高电池的寿命、降低…

比尔盖茨:GPT-5不会比GPT-4好多少,生成式AI已达到极限

比尔盖茨一句爆料&#xff0c;成为机器学习社区热议焦点&#xff1a; “GPT-5不会比GPT-4好多少。” 虽然他已不再正式参与微软的日常运营&#xff0c;但仍在担任顾问&#xff0c;并且熟悉OpenAI领导团队的想法。 消息来自德国《商报》&#xff08;Handelsblatt&#xff09;对…