RabbitMQ详解-06RabbitMQ高级

news2025/2/26 13:45:29

1. 过期时间TTL

可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息自动被删除。RabbitMQ可以对消息和队列设置TTL。有以下两种设置方法:

  • 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 对消息进行单独设置,每条消息TTL可以不同。

若两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

1.1. 设置队列TTL

配置类中设置:

args.put("x-message-ttl",5000);
return QueueBuilder.durable(ITEM_QUEUE).withArguments(args).build();

参数 x-message-ttl 的值必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前消息将最多只存活 6 秒钟。

如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
在这里插入图片描述

1.2. 设置消息TTL

在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:

/**
     * 过期消息
     * 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
     */
    @Test
    public void ttlMessageTest(){
        MessageProperties messageProperties = new MessageProperties();
        //设置消息的过期时间,5秒
        messageProperties.setExpiration("5000");Message message = new Message("测试过期消息,5秒钟过期".getBytes(), messageProperties);
        //路由键与队列同名
        rabbitTemplate.convertAndSend("my_ttl_queue", message);
    }

expiration 字段以毫秒为单位表示 TTL 值。且与 x-message-ttl 具有相同的约束条件。因为 expiration 字段必须为字符串类型,broker 将只会接受以字符串形式表达的数字。
当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。

2. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也称为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

2.1消息TTL过期

2.1.1演示

生产者:

public class Producer {
	private static final String NORMAL_EXCHANGE = "normal_exchange";
		public static void main(String[] argv) throws Exception {
			try (Channel channel = RabbitMqUtils.getChannel()) {
				channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
				//设置消息的 TTL 时间
				AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
				//该信息是用作演示队列个数限制
				for (int i = 1; i <11 ; i++) {
					String message="info"+i;
					channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties, message.getBytes());
					System.out.println("生产者发送消息:"+message);
				}
			}
		} 
	}
}

消费者1代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
	//普通交换机名称
	private static final String NORMAL_EXCHANGE = "normal_exchange";
	//死信交换机名称
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		//声明死信和普通交换机 类型为 direct
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		//声明死信队列
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		//死信队列绑定死信交换机与 routingkey
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		//正常队列绑定死信队列信息
		Map<String, Object> params = new HashMap<>();
		//正常队列设置死信交换机 参数 key 是固定值
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//正常队列设置死信 routing-key 参数 key 是固定值
		params.put("x-dead-letter-routing-key", "lisi");

		String normalQueue = "normal-queue";
		channel.queueDeclare(normalQueue, false, false, false, params);
		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
		System.out.println("等待接收消息.....");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println("Consumer01 接收到消息"+message);
		};
		channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
		});
	}
}

在这里插入图片描述
消费者2代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		System.out.println("等待接收死信队列消息.....");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println("Consumer02 接收死信队列的消息" + message);
		};
		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
		});
	}
}

在这里插入图片描述

2.1.2流程

具体因为队列消息过期而被投递到死信队列的流程:
在这里插入图片描述

2.2 队列达到最大长度

2.2.1演示

生产者:

public class Producer {
	private static final String NORMAL_EXCHANGE = "normal_exchange";
		public static void main(String[] argv) throws Exception {
			try (Channel channel = RabbitMqUtils.getChannel()) {
				channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
				//该信息是用作演示队列个数限制
				for (int i = 1; i <11 ; i++) {
					String message="info"+i;
					channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
					System.out.println("生产者发送消息:"+message);
				}
			}
		} 
	}
}

消费者1修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)
在这里插入图片描述
注意此时需要把原先队列删除 因为参数改变了
消费者2代码不变(启动 C2 消费者)
在这里插入图片描述

2.2.2流程

消息超过队列最大消息长度而被投递到死信队列的流程在前面的图中已包含。

2.3消息被拒

消息生产者代码同上生产者一致

消费者1代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
	//普通交换机名称
	private static final String NORMAL_EXCHANGE = "normal_exchange";
	//死信交换机名称
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		//声明死信和普通交换机 类型为 direct
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		//声明死信队列
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		//死信队列绑定死信交换机与 routingkey
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		//正常队列绑定死信队列信息
		Map<String, Object> params = new HashMap<>();
		//正常队列设置死信交换机 参数 key 是固定值
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//正常队列设置死信 routing-key 参数 key 是固定值
		params.put("x-dead-letter-routing-key", "lisi");
		String normalQueue = "normal-queue";
		channel.queueDeclare(normalQueue, false, false, false, params);
		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
		System.out.println("等待接收消息.....");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			if(message.equals("info5")){
				System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
				//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
			}else {
				System.out.println("Consumer01 接收到消息"+message);
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
		};
		boolean autoAck = false;
		channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
		});
	}
}

在这里插入图片描述
消费者代码不变
启动消费者 1 然后再启动消费者 2
在这里插入图片描述

3. 延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:
在这里插入图片描述
在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消费者再从这些死信队列接收消息就可以实现消息的延迟接收。

延迟队列的应用场景;如:

  • 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要进行支付失败的异常处理(将库存加回去),这时候可以通过使用延迟队列来处理。
  • 在系统中如有需要在指定的某个时间之后执行的任务都可以通过延迟队列处理。

4. 消息确认机制

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。

4.1 发布确认

有两种方式:消息发送成功确认和消息发送失败回调。

4.1.1消息发送成功确认

在配置文件当中需要添加:

spring.rabbitmq.publisher-confirm-type=correlated

⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE
经测试有两种效果:
其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

消息确认回调方法:

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息确认成功....");
        } else {
            //处理丢失的消息
            System.out.println("消息确认失败," + cause);
        }
    }
}

发送消息:

@Test
    public void queueTest(){
        //路由键与队列同名
        rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
    }

管理界面确认消息发送成功:
在这里插入图片描述
消息确认回调:
在这里插入图片描述

4.1.2消息发送失败回调

消息失败回调方法:

@Component
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	//rabbitTemplate 注入之后就设置该值
	@PostConstruct
	private void init() {
		rabbitTemplate.setConfirmCallback(this);
		/**
		* true:
		* 交换机无法将消息进行路由时,会将该消息返回给生产者
		* false:
		* 如果发现消息无法进行路由,则直接丢弃
		*/
		rabbitTemplate.setMandatory(true);
		//设置回退消息交给谁处理
		rabbitTemplate.setReturnCallback(this);
	}

    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        String msgJson  = new String(message.getBody());
        System.out.println("Returned Message:"+msgJson);
    }
}

模拟消息发送失败:

@Test
public void testFailQueueTest() throws InterruptedException {
    //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
    amqpTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认应答。");
}

在这里插入图片描述

4.2 事务支持

场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1841756.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32学习笔记(六)--引脚重映射详解

STM32F103C8T6引脚定义&#xff1a; 在STM32微控制器中&#xff0c;外设引脚的复用功能&#xff08;Alternate Function&#xff0c;AF&#xff09;有时会出现冲突&#xff0c;例如当USART2_CTS和TIM2_CH1同时需要使用相同的引脚时。此时&#xff0c;可以通过引脚重映射功能&am…

AI播客下载:The Gradient-AI前沿见解

The Gradient 是一个致力于让更多人轻松了解人工智能&#xff0c;并促进人工智能社区内讨论的组织。我们目前开展的项目包括 The Gradient 杂志、The Gradient 播客、The Update 通讯以及 Mastodon 实例 Sigmoid Social。 我们是一个由来自不同机构和公司的研究生、研究人员及…

三十分钟学会RabbitMQ

1、初识MQ 1.1 MQ是什么&#xff1f; MQ(message queue)&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递消息。在互联网架构中…

Wills Room Environment Dormitory Room Environment

有126个独特的网格。包括所有展示的资产和优质资产。具有良好的细节水平,并针对Gameready项目进行了优化。 艺术家Jonjo Hemmens创造的环境 如果想填充你的游戏环境或任何类型的虚拟制作级别,这里有一个包,你可以获得并使用它来得到高质量的视觉效果和优化的资产。 下载:​…

Unicorn批量模拟执行爆破实践

实践题目&#xff1a;CFI-CTF 2018 Automated Reversing 代码与附件地址&#xff1a;https://github.com/Airrcat/unicorn_loader 目标附件如&#xff1a; 每份附件的代码大致如下&#xff1a; 简单来说&#xff0c;程序会接收命令行参数并作一个字节的运算后与一固定值的字…

转型技术管理:九大步骤解锁高效管理新境界

文章目录 引言一、寻求反馈二、从员工的角度看待问题三、总览全局四、管理自己的情绪五、赞赏员工的出色工作六、在人前支持员工七、管理自己的职业生涯八、认识到自己也许存在偏见&#xff0c;与不同于自己的人交流九、在工作中建立信任和沟通总结 引言 在快速变化的科技浪潮…

SparkSQL的分布式执行引擎-Thrift服务:学习总结(第七天)

系列文章目录 SparkSQL的分布式执行引擎 1、启动Thrift服务 2、beeline连接Thrift服务 3、开发工具连接Thrift服务 4、控制台编写SQL代码 文章目录 系列文章目录前言一、SparkSQL的分布式执行引擎(了解)1、启动Thrift服务2、beeline连接Thrift服务3、开发工具连接Thrift服务4、…

opencascade AIS_InteractiveContext源码学习相关枚举 AIS_SelectionScheme AIS_StatusOfPick

AIS_SelectionScheme 枚举 AIS_SelectionScheme 设置交互上下文中的选择方案。 枚举值&#xff1a; AIS_SelectionScheme_UNKNOWN 未定义的方案 AIS_SelectionScheme_Replace 清除当前选择并选择检测到的对象 AIS_SelectionScheme_Add 将检测到的对象添加到当前选择 AIS_…

【Kubernetes项目部署】k8s集群+高可用、负载均衡+防火墙

项目架构图 &#xff08;1&#xff09;部署 kubernetes 集群 详见&#xff1a;http://t.csdnimg.cn/RLveS &#xff08;2&#xff09; 在 Kubernetes 环境中&#xff0c;通过yaml文件的方式&#xff0c;创建2个Nginx Pod分别放置在两个不同的节点上&#xff1b; Pod使用hostP…

智慧学习实践系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;企业管理&#xff0c;任务管理&#xff0c;公告管理&#xff0c;菜单管理&#xff0c;用户管理&#xff0c;基础数据管理 企业账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;任务…

电商API接口详述:涵盖订单、库存等多功能接口介绍

电商商家自研管理系统&#xff0c;线下ERP系统或WMS系统想要接入电商平台订单打单发货&#xff0c;通过点三电商API可以一键对接多个电商平台&#xff0c;帮助商家、ERP/WMS服务商快速开发电商模块&#xff0c;实现电商业务管理功能&#xff0c;那么点三电商API接口有哪些可用接…

省市区下拉选择:3个el-select(附完整代码+json)

目录 直接上做出的效果&#xff1a; 页面代码&#xff1a; 使用click.native&#xff1a; data及引入&#xff1a; 初始化&#xff1a; methods&#xff1a; JSON: 示例结构&#xff1a; 1.code.json 2.pca-code.json 回显&#xff1a; 视频效果&#xff1a; 直接上做出…

盘点下常见 HDFS JournalNode 异常的问题原因和修复方法

盘点下常见 HDFS JournalNode 异常的问题原因和修复方法 最近在多个客户现场以及公司内部环境&#xff0c;都遇到了因为 JournalNode 异常导致 HDFS 服务不可用的问题&#xff0c;在此总结下相关知识。 1 HDFS HA 高可用和 JournalNode 概述 HDFS namenode 有 SPOF 单点故障…

MS3121地隔离放大器

MS3121 是一款应用于车载音频系统的地隔离放大 器。芯片可以很好地解决汽车音频系统中的绕线电阻问 题&#xff0c;以及由车载电子设备带来的噪声问题。另外&#xff0c;芯片 所需要的外围电容小&#xff0c;便于系统的集成。注意&#xff0c;芯片的 地电位需要和后级音频功…

Mac数据如何恢复?3 款最佳 Mac 恢复软件

如果您认为 Mac 上已删除的文件永远丢失了&#xff0c;那您就大错特错了&#xff01;实际上&#xff0c;即使您清空了 Mac 上的垃圾箱&#xff0c;也有许多解决方案可以帮助您恢复已删除的文件。最好的解决方案之一是 Mac 恢复删除软件。最好的Mac 恢复删除应用程序可以轻松准确…

docker部署ClamAV集成java和python实现文件病毒扫描

介绍 官方文档&#xff1a;https://docs.clamav.net/manual/Signatures/DatabaseInfo.html ClamAV 是一个开源的反病毒引擎&#xff0c;它由多个模块组成&#xff0c;负责不同的任务处理。以下是 ClamAV 的主要模块和它们的功能&#xff1a; clamd&#xff1a;clamd 是 Clam…

Cookie、Session、Token的关系和区别

关系 Session与Cookie&#xff1a;Session通常依赖于Cookie来工作。当服务器为客户端创建一个Session时&#xff0c;它会在服务器上存储与客户端相关的信息&#xff0c;并将一个唯一的SessionID通过Cookie发送给客户端。客户端在后续的请求中会携带这个Cookie&#xff08;包含…

视频监控管理平台的日志功能的重要性

日志功能的重要性 视频监控平台在日常工作生活中越来越重要&#xff0c;具有完备的平台日志&#xff0c;不仅可以增强视频监控系统的自身安全性&#xff0c;还能在更大程度上保障社会的安全与稳定。 &#xff08;一&#xff09;安全保障 视频监控平台作为安全防护…

流媒体学习之路(WebRTC)——音频NackTracker优化思路(8)

流媒体学习之路(WebRTC)——音频NackTracker优化思路&#xff08;8&#xff09; —— 我正在的github给大家开发一个用于做实验的项目 —— github.com/qw225967/Bifrost目标&#xff1a;可以让大家熟悉各类Qos能力、带宽估计能力&#xff0c;提供每个环节关键参数调节接口并实…

C51学习归纳13 --- AD/DA转换

AD/DA转换实现了计算机和模拟信号的连接&#xff0c;扩展了计算机的应用场景&#xff0c;为模拟信号数字化提供了底层支持。 AD转换通常是多个输入通道&#xff0c;使用多路选择器连接到AD开关&#xff0c;实现AD多路复用的目的&#xff0c;提高利用率。 AD/DA转换可以使用串口…