【RabbitMQ高级——过期时间TTL+死信队列】

news2024/10/12 2:15:02

1. 过期时间TTL概述

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。

目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

2.TTL设置方法

2.1. 通过队列属性设置

package com.zju.service.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author guozonghao
 * @Description
 * @create 2024-10-04 09:37
 */
@Configuration
public class TTLRabbitMqConfiguration {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);
    }

    @Bean
    public Queue directTTLQueue(){
        //设置过期时间
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//这里一定是int类型
        return new Queue("ttl.direct.queue",true,false,false,args);
    }

    @Bean
    public Binding directTTLBinding(){
        return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

2.2. 通过消息属性设置

通过AMQP.BasicProperties这个类在发送消息时配置相关的属性

public class MessageTTLProducer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("47.104.141.27");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare("ttl.queue2", true, false, false, null);
            // 6: 准备发送消息的内容
            String message = "你好,学相伴!!!";
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("x", "1");
            headers.put("y", "1");
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2) // 传送方式
                    .priority(1)
                    .contentEncoding("UTF-8") // 编码方式
                    .expiration("5000") // 过期时间
                    .headers(headers).build(); //自定义属性
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routing
            // @params3: 属性配置
            // @params4: 发送消息的内容
            for (int i = 0; i <10 ; i++) {
                channel.basicPublish("", "ttl.queue2", basicProperties, message.getBytes());
                System.out.println("消息发送成功!");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

3. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:

  • 消息被拒绝(没有被ack)
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。
在这里插入图片描述

3.1 创建死信队列

package com.zju.service.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author guozonghao
 * @Description
 * @create 2024-10-04 09:37
 */
@Configuration
public class DeadRabbitMqConfiguration {

    @Bean
    public DirectExchange deadDirectExchange(){
        return new DirectExchange("dead_direct_exchange",true,false);
    }

    @Bean
    public Queue directDeadQueue(){
        return new Queue("dead.direct.queue",true);
    }

    @Bean
    public Binding directDeadBinding(){
        return BindingBuilder.bind(directDeadQueue()).to(deadDirectExchange()).with("dead");
    }
}

3.2 其他队列绑定死信队列

package com.zju.service.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author guozonghao
 * @Description
 * @create 2024-10-04 09:37
 */
@Configuration
public class TTLRabbitMqConfiguration {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);
    }

    @Bean
    public Queue directTTLQueue(){
        //设置过期时间
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//设置过期时间
        args.put("x-max-length",5);//设置队列的最大长度
        args.put("x-dead-letter-exchange","dead_direct_exchange");//绑定自己创建的死信交换机
        args.put("x-dead-letter-routing-key","dead");//设置死信队列的路由key
        return new Queue("ttl.direct.queue",true,false,false,args);
    }

    @Bean
    public Binding directTTLBinding(){
        return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2206585.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu安装Apache教程

系统版本&#xff1a;Ubuntu版本 23.04 Ubuntu是一款功能强大且用户友好的操作系统&#xff0c;而Apache是一款广泛使用的Web服务器软件。在Ubuntu上安装Apache可以帮助用户搭建自己的网站或者进行Web开发。为大家介绍如何在Ubuntu上安装Apache&#xff0c;并提供详细的教程和操…

青云AI算力创新:直击AI落地痛点 打造企业数智化基石

在当今这个数字化、智能化的时代&#xff0c;企业数字化转型、智能化升级应用实践在加速&#xff0c;AI算力已经成为企业数字化转型和智能化升级的重要基石&#xff0c;而AI算力在推动技术创新和业务增长中起到了关键作用。青云科技近日举办的AI算力发布会&#xff0c;标志着AI…

Numpy模块中的ndarray对象属性、数组的重塑、合并、分割

一、ndarray对象属性介绍 说明&#xff1a;ndim属性返回数组的维度&#xff0c;即数组有多少个轴。例如&#xff0c;一维数组的ndim为1&#xff0c;二维数组的ndim为2&#xff0c;以此类推。 示例1&#xff1a; # coding:utf-8 import numpy as np wnp.array([[1,2,3],[4,5,6…

Java如何保证线程T1,T2,T3 顺序执行?

文章目录 方案 1&#xff1a;使用 Thread.join()方案 2&#xff1a;使用 Lock 和 Condition方案 3&#xff1a;使用 CyclicBarrier方案 4&#xff1a;使用 Semaphore总结 博主介绍&#xff1a;全网粉丝10w、CSDN合伙人、华为云特邀云享专家&#xff0c;阿里云专家博主、星级博主…

初始操作系统篇(1)—— 操作系统的概念与分类

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a; 操作系统 目录 操作系统的基本概念 操作系统的概念 操作系统的特征 并发 共享 虚拟 异步 操作系统的目标和功能 操作系统的发展与分…

T11:优化器对比实验

T11周&#xff1a;优化器对比实验 **一、前期工作**1.设置GPU,导入库 **二、数据预处理**1.导入数据2.检查数据3.配置数据集4.数据可视化 **三、构建模型****四、训练模型****五、模型评估**六、总结 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&a…

QT实现Opencv图像处理

案例 基于QT的人脸识别 pro文件需要加以下代码 INCLUDEPATH E:/opencv/opencv3.4-qt-intall/install/include INCLUDEPATH E:/opencv/opencv3.4-qt-intall/install/include/opencv INCLUDEPATH E:/opencv/opencv3.4-qt-intall/install/include/opencv2 LIBS E:/opencv/o…

Linux go语言配置

首先安装go语言 先从go官方下载源码 解压&#xff1a; tar -xzvf go1.23.1.linux-amd64.tar.gz 移动go到 /usr/local/ mv go /usr/local 设置环境变量 vi ~/.bashrc source ~/.bashrc 重新获取文件 输入 go version 查看版本

STM32 A/D转换器

目录 模拟量输入通道 A/D转换器简介 模拟量输入信号类型与量程自动转换 量程自动转换 STM32F103ZET6集成的ADC模块 STM32的ADC的主要特征 STM32的ADC模块结构 ADC中断事件主要有以下3个&#xff1a; 模拟量输入通道 模拟量输入通道一般由信号处理、模拟开关、放大器、采…

MES管理系统解决方案常见的应用场景

在现代制造业的浪潮中&#xff0c;车间管理的效率与智能化水平成为了企业竞争力的关键因素。许多车间管理者都耳闻MES管理系统解决方案能够显著优化生产执行流程&#xff0c;但由于缺乏亲身体验&#xff0c;往往对此持保留态度。那么&#xff0c;MES管理系统究竟能否如传闻般发…

po框架的了解和应用

https://www.cnblogs.com/xiaolehong/p/18458470 笔记 任务:1、通过po框架输入测试报告 2、编写自动化测试框架 3、总结测试讲解稿 自动化测试框架概念: 自动化测试框架是一个集成体系,这个体系中包含测试功能的函数、测试数据源、测试对以及重要的模块。 作用:用于解决或…

MIDIPLUS 50周年丨中国国际乐器展览会首日盛况

10月10日&#xff0c;由中国乐器协会、上海国展展览中心有限公司、法兰克福展览&#xff08;上海&#xff09;有限公司共同主办的中国&#xff08;上海&#xff09;国际乐器展览会在上海新国际博览中心&#xff08;上海市浦东新区龙阳路2345号&#xff09;盛大开幕。 2024上海…

ART 光学跟踪系统:通过 VR HMD 解锁完全沉浸式 VR 体验

在虚拟现实体验中&#xff0c;完全沉浸式虚拟现实体验应该既准确又舒适。当与现实世界的物体融合时&#xff0c;虚拟现实的表现必须与现实精确匹配。这意味着所使用的运动跟踪系统必须为整套项目提供可靠且可重复的高精度运动数据&#xff0c;以及体感无法察觉到的超低延迟。AR…

水深探测仪的作用和使用方法

在水域救援的行动里&#xff0c;救援人员时刻面临着复杂多变、充满未知的水域状况。当接到救援任务奔赴现场&#xff0c;那片需要涉足的水域就像一个神秘莫测的异世界&#xff0c;挑战着所有人的认知与勇气。 水深探测仪作为一种专用于测量水域深度的设备&#xff0c;通过声波和…

宝塔 进程守护管理器 神坑,再次跌入。thinkphp-queue队列 勤勤学长

如果&#xff0c;你有在使用【进程守护管理器】&#xff0c;记得在更新/重启&#xff0c;甚至卸载重新安装后&#xff0c;重启服务器。 事情的起因是&#xff0c;昨日服务器突然异常&#xff0c;网站无法正常访问&#xff0c;进入宝塔面板&#xff0c;发现 cpu和负载率均超过1…

甘肃小米,一口软糯,满是乡愁

&#x1f388;甘肃小米&#xff0c;自然的馈赠&#x1f388;&#x1f33e;家人们&#xff0c;今天必须给大家安利来自甘肃的小米&#xff01;&#x1f49b;✨甘肃&#xff0c;那片广袤而神奇的土地&#xff0c;孕育出的小米有着别样的魅力。颗粒饱满&#xff0c;色泽金黄&#…

RDD优化:缓存和checkpoint机制、数据共享(广播变量、累加器)、RDD的依赖关系、shuffle过程、并行度说明

文章目录 1. 缓存和checkpoint机制1.1 缓存使用1.2 checkpoint1.3 缓存和checkpoint的区别 2. 数据共享2.1 广播变量2.2 累加器 3. RDD依赖关系4.shuffle过程4.1 shuffle介绍4.2 spark计算要尽量避免shuffle 5. 并行度 1. 缓存和checkpoint机制 缓存和checkpoint也叫作rdd的持…

源代码加密有哪些技巧呢?除了用加密软件还有哪些方法?

导语&#xff1a;源代码加密是保护软件核心资产的主要方式了&#xff0c;可以通过多种技术措施确保源代码不被未授权访问、复制或篡改&#xff0c;防止泄密问题。这篇文章是一些有效的源代码加密技巧&#xff0c;欢迎您的阅读&#xff01; 源代码加密常用技巧1、访问控制&…

俏生元将传统膳食智慧融入现代生活,自然成分绽放健康光彩

近年来&#xff0c;当代女性健康食品市场正经历快速发展和显著变化。随着女性健康意识的提升&#xff0c;市场对专门针对女性健康的产品需求快速上升。女性消费者对健康的关注不再局限于表面&#xff0c;而是越来越注重内在健康和生活质量的提升。此外&#xff0c;中式养生文化…

Python 基于 Bert 的中文情歌分析,多分类中文情感分析

前言 在自然语言处理&#xff08;NLP&#xff09;领域中&#xff0c;情感分析是一项非常常见的任务。它的目标是判断文本的情感倾向&#xff0c;例如在社交媒体上的评论、产品评价、电影评论等数据中&#xff0c;识别文本是正面的、负面的&#xff0c;还是中性的。与传统的二分…