RabbitMQ死信队列、延迟队列原理实战

news2025/1/17 21:46:20

1.死信队列+TTL

  • 什么是TTL

    • time to live:消息存活时间
    • 如果消息在存活时间内未被消费,则会被清除
    • RabbitMQ支持两种TTL设置
      • 单独消息进行配置TTL
      • 整个队列进行配置TTL(使用居多)
  • 什么是RabbitMQ的死信队列

    • 没有被及时消费的消息存放的队列
  • 什么是RabbitMQ的死信交换机

    • Dead Letter Exchange(死信交换机,缩写DLX),当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 消息有哪几种情况成为死信

    • 消费者拒收消息(basic.reject/basic.nack),并且没有重新入队requeue=false

    • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time to live)

    • 队列的消息长度达到极限

    • 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

2.使用管控台测试死信队列

  • RabbitMQ管控台消息TTL测试

    • 队列过期时间使用参数,对整个队列消息统一过期

      • x-message-ttl:单位ms毫秒
    • 消息过期时间使用参数(如果队列头部消息未过期,队列中间消息已经过期,该消息还在队列里面)

      • expiration:单位ms毫秒
    • 两者都配置的话,时间短的先触发

  • RabbitMQ Web控制台测试

    • 新建死信交换机(和普通交换机没区别)

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    • 新建死信队列(和普通队列没区别)

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    • 死信交换机和死信队列绑定

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    • 新建普通队列,设置过期时间、指定死信交换机

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    • 测试:直接在Web控制台往普通队列发送消息即可

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

3.延迟队列

  • 什么是延迟队列

    • 一种带有延迟功能的消息队列,Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息
  • 业界的一些实现延迟方式

    • 定时任务高精度轮训
    • 采用RocketMQ自带延迟消息功能
    • RabbitMQ本身是不支持延迟队列的,怎么办?
      • 结合死信队列的特性,就可以做到延迟消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 交换机和队列注册代码

    package com.gen.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 死信交换机
         */
        public static final String DEAD_EXCHANGE = "dead_exchange";
    
        /**
         * 死信队列
         */
        public static final String DEAD_QUEUE = "dead_queue";
    
        /**
         * 死信路由键
         */
        public static final String DEAD_ROUTING_KEY = "dead_routing_key";
    
        /**
         * 死信交换机
         *
         * @return
         */
        @Bean
        public Exchange deadExchange() {
            return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();
        }
    
        /**
         * 死信队列
         *
         * @return
         */
        @Bean
        public Queue deadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        /**
         * 死信交换机与死信队列进行绑定
         *
         * @param deadQueue
         * @param deadExchange
         * @return
         */
        @Bean
        public Binding deadBinding(Queue deadQueue, Exchange deadExchange) {
            return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
        }
    
        /**
         * 普通交换机
         */
        public static final String ORDER_EXCHANGE = "order_exchange";
    
        /**
         * 普通队列
         */
        public static final String ORDER_QUEUE = "order_queue";
    
        /**
         * 普通路由键
         */
        public static final String ORDER_ROUTING_KEY = "order_routing_key";
    
        /**
         * 普通交换机
         *
         * @return
         */
        @Bean
        public Exchange orderExchange() {
            return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();
        }
    
        /**
         * 普通队列
         *
         * @return
         */
        @Bean
        public Queue orderQueue() {
            Map<String, Object> args = new HashMap<>(3);
            // 过期时间,单位毫秒
            args.put("x-message-ttl", 10000);
            // 消息过期后,进入到死信交换机
            args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 消息过期后,进入到死信交换机的路由键
            args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
            return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
        }
    
        /**
         * 普通交换机与普通队列进行绑定
         *
         * @param orderQueue
         * @param orderExchange
         * @return
         */
        @Bean
        public Binding orderBinding(Queue orderQueue, Exchange orderExchange) {
            return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_ROUTING_KEY).noargs();
        }
    
    }
    
  • 消息生产者

    package com.gen;
    
    import com.gen.config.RabbitMQConfig;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class GenRabbitmqApplicationTests {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        void send() {
            this.rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, RabbitMQConfig.ORDER_ROUTING_KEY, "测试延迟队列,设置10s");
        }
    
    }
    
  • 消息消费者(只监听消费死信队列,不监听消费普通队列)

    package com.gen.listener;
    
    import com.gen.config.RabbitMQConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
    public class DeadMQListener {
    
        @RabbitHandler
        public void deadConsumer(String msg, Message message, Channel channel) throws IOException {
            System.out.println(msg);
            // 成功确认,消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1458518.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《雾锁王国》游戏存档迁移教程,本地存档/服务器数据迁移到服务器

《雾锁王国》游戏存档迁移教程&#xff0c;本地存档/服务器数据迁移到服务器 一、本地存档转到服务器&#xff1a; 首先保存本地存档数据&#xff0c;例如我的存档路径为C:\Program Files(x86)\Steam\Userdata 然后远程登录到你的Windows云服务器&#xff0c;打开任务管理器…

戴尔Dell R740服务器开机冒烟亮黄灯故障维修

今天分享的是一台过保修期的DELL PowerEdge R740服务器开机冒烟的维修案例。先上图&#xff1a; 接到用户报修后工程师立即响应&#xff0c;由于用户也是刚开工第一天服务器开机就出现了这种祥龙吐雾的祥兆&#xff0c;导致工厂业务流程无法正常使用&#xff0c;这台机器在东莞…

【Day51】代码随想录之动态规划完全背包_139.单词拆分_背包总结

文章目录 动态规划理论基础动规五部曲&#xff1a;出现结果不正确&#xff1a; 139.单词拆分背包问题0-1背包完全背包 动态规划理论基础 动规五部曲&#xff1a; 确定dp数组 下标及dp[i] 的含义。递推公式&#xff1a;比如斐波那契数列 dp[i] dp[i-1] dp[i-2]。初始化dp数组…

聊聊机器人视觉定位抓取那些事儿

随着科技的不断进步和工业智能化的加速推进&#xff0c;巡检机器人已经成为现代工业生产和设施管理中不可或缺的重要组成部分。而在巡检机器人的发展过程中&#xff0c;视觉定位抓取技术的应用日益广泛&#xff0c;为智能化巡检运维提供了全新的可能性。 当巡检操作机器人精准抓…

牛客网 JZ11 旋转数组的最小数字

思路&#xff1a; 法一&#xff1a; 因为数组是非降序的&#xff0c;也就代表该数组是有序的&#xff0c;如果将一开始的元素旋转到后面&#xff0c;那么就会出现一个断点&#xff0c;从大值变为小值的这么一个断点&#xff08;这是本题的特点&#xff09; 举例如下&#xf…

WEB基础及http协议(Apache)

一、httpd安装组成 http服务基于C/S结构 1、常见http服务器程序 httpd apache&#xff0c;存在C10K&#xff08;10K connections&#xff09;问题nginx 解决C10K问题lighttpdIIS .asp 应用程序服务器tomcat .jsp 应用程序服务器jetty 开源的servlet容器&#xff0c;基于Java…

深度学习之pytorch实现逻辑斯蒂回归

深度学习之pytorch实现逻辑斯蒂回归 解决的问题数学公式logiatic函数损失值 代码与线性回归代码的区别数据损失值构造回归的函数 结果分析 解决的问题 logistic 适用于分类问题&#xff0c;这里案例( y为0和1 &#xff0c;0和 1 分别代表一类&#xff09; 于解决二分类&#xf…

学习Vue3的第二天

目录 Vue3核心语法 OptionsAPI 与 CompositionAPI setup函数 setup 概述 setup 语法糖 ref 创建&#xff1a;基本类型的响应式数据 reactive 创建&#xff1a;对象类型的响应式数据 ref 创建&#xff1a;对象类型的响应式数据 ref 对比 reactive toRefs 与 toRef co…

实现一个可自适应高度,带有左上角图标,右侧按钮自适应位置的输入框

要点&#xff1a;1.高度自适应&#xff0c;当换行时&#xff0c;高度增加&#xff0c;当删减一定也会高度减少。 2.左上角留出一个logo位置 3.右侧的按钮位置&#xff0c;当最开始的一行没有挤占到时&#xff0c;位于右侧。当一行挤占到了&#xff0c;按钮换行到下方。 效果图&…

【开发战斗系统很难?这几个重要的技术点千万不能忽视!】

作为网游中最重要的MMO类游戏的前身&#xff0c;ARPG游戏是一种非常经典且流行的游戏类型&#xff0c;其诞生的经典之作如&#xff1a;《魂系》、《暗黑破坏神系列》、《塞尔达传说系列》等&#xff0c;大家也是耳熟能详。 可以说ARPG游戏无论从体量还是技术含量都是游戏开发领…

openGauss 5.0.0全密态数据库应用小试

前言 openGauss HCIA教材中&#xff0c;安全是一个重要的章节&#xff0c;在实际项目中&#xff0c;随着网络安全和信息安全形势的变化&#xff0c;企业也越来越重视数据库安全。去年在HALP内部进行openGauss培训时&#xff0c;安全特性就被学员们提出来要重点讲解&#xff0c…

Leetcode 283.移动零

给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0]示例 2: 输入: nums [0] 输出: […

debug - 只要在内存中有显示相关的数据, 就会被CE找到

文章目录 debug - 只要在内存中有显示相关的实际数据, 就会被CE找到概述笔记demo实现demo运行效果用CE查找实际数据地址找到自己的调试点 - 方法1找到自己的调试点 - 方法2打补丁备注END debug - 只要在内存中有显示相关的实际数据, 就会被CE找到 概述 自己写了一个demo, 想验…

【RabbitMQ快速入门】初识RabbitMQ

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;️…

BioTech - 交联质谱 (Crosslinks) 的常见数据格式说明

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/136190750 交联质谱 (Crosslinking Mass Spectrometry&#xff0c;Crosslinks) 技术 是一种结合化学交联剂和质谱仪的方法&#xff0c;用于研究蛋…

OSQP文档学习

OSQP官方文档 1 QSQP简介 OSQP求解形式为的凸二次规划&#xff1a; x ∈ R n x∈R^n x∈Rn&#xff1a;优化变量 P ∈ S n P∈S^n_ P∈Sn​&#xff1a;半正定矩阵 特征 &#xff08;1&#xff09;高效&#xff1a;使用了一种自定义的基于ADMM的一阶方法&#xff0c;只需…

Android的Compose

Jetpack Compose 是用于构建原生 Android 界面的新工具包&#xff0c;无需修改任何 XML 布局&#xff0c;也不需要使用布局编辑器。相反&#xff0c;只需调用可组合函数来定义所需的元素&#xff0c;Compose 编译器即会完成后面的所有工作。 简而言之&#xff0c;使用Compose&…

算法-矩阵置零

1、题目来源 73. 矩阵置零 - 力扣&#xff08;LeetCode&#xff09; 2、题目描述 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1…

Javascript怎么输出内容?两种常见方式以及控制台介绍

javascript是一种非常重要的编程语言&#xff0c;在许多网页中它被广泛使用&#xff0c;可以实现许多交互效果和动态效果。输出是javascript中最基本的操作之一&#xff0c;下面将介绍两种常见的输出方式。 一、使用console.log()函数输出 console.log()函数是常用的输出函数…

Django学习笔记-创建第一个django项目

1.创建一个虚拟环境的python项目 2.点击解释器设置 3.安装django包 4.终端选择Command Prompt 5.创建django项目运行django-admin startproject demo01(自命名) 6.修改连接数据库为mysql 7.修改语言(中国汉语)和时区(亚洲上海)USE_TZ改为False,否则时区不生效 8.修改TEMPLA…