本文将介绍一些RabbitMQ的重要特性。
官方文档:Protocol Extensions | RabbitMQ
本文是使用的Spring整合RabbitMQ环境。
生产者发送确认(publish confirm)
当消息发送给消息队列,如何确保消息队列一定收到消息呢,RabbitMQ通过 事务机制 和 发送方确认(publisher confirm)来实现。事务机制比较消耗性能,实际使用的不是很多,所以这里主要介绍发送方确认机制。
发送确认机制有两种模式来完整实现实现。一个是Confirm确认模式,另一个是return回退模式。
confirm确认模式
Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达Exchange, 这个监听都会被执行, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false。
代码
配置文件
spring:
rabbitmq:
host: IP地址
port: 端口
username: 用户名
password: 密码
virtual-host: 虚拟主机
publisher-confirm-type: correlated # 消息发送确认
自定义RabbitTemplate
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
// 发送确认机制
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack){
System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());
}else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);
//相应的业务处理
}
}
});
return rabbitTemplate;
}
}
声明交换机等
import com.example.rabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
//发送方确认
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();
}
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
}
发送消息
import com.example.rabbitmqextensions.constant.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
// 正确的交换机和路由键
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);
// 这里修改成不存在的路由键
// confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);
// 这里修改成不存在的交换机
// confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);
return "消息发送成功";
}
}
结果
交换机和路由键都正确
交换机错误但路由键正确
交换机正确但路由键错误
可以看出,confirm确认模式只是针对交换机,当交换机正确时,它的confirm方法中的ack就是true,否则就是false。而且交换机和路由键不正确时,它不会保存消息到队列中,也不会退回给生产者,消息也就丢失了。
return回退模式
消息到达Exchange之后, 会根据路由规则匹配, 把消息放入Queue中. Exchange到Queue的过程, 如果⼀条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调方法, 对消息进行处理。
代码
配置文件
同上
自定义RabbitTemplate
// 发送确认机制
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息, 消息ID: %s \n", correlationData == null ? null : correlationData.getId());
} else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData == null ? null : correlationData.getId(), cause);
//相应的业务处理
}
}
});
//消息被退回时, 回调方法
// 开启消息退回
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息退回:" + returned);
}
});
return rabbitTemplate;
}
声明交换机等
同上
发送消息
@RequestMapping("/returns")
public String returns() {
CorrelationData correlationData = new CorrelationData("2");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);
return "消息发送成功";
}
结果
正确的交换机和路由键
交换机错误
路由键错误
可以看出,回退只能把路由键错误的消息回退给生产者,交换机错误的无法回退。
持久化
当消息从生产者到达RabbitMQ的服务器后,必须得先持久化到本地,否则当服务器宕机了,消息也就不存在了。RabbitMQ的持久化分成交换机持久化,队列持久化和消息持久化。
交换机持久化
交换机的持久化是通过在声明交换机时将 durable 参数设置为 true 实现的。这样,交换机的属性会在服务器内部保存,当 RabbitMQ 服务器发生意外或关闭后,重启 RabbitMQ 时,无需重新建立交换机,交换机会自动恢复,相当于一直存在。
如果交换机不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失。对于一个长期使用的交换机来说,建议将其设置为持久化的。
比如刚才创建的交换机就是持久化的。
队列持久化
队列的持久化是通过在声明队列时将 durable 参数设置为 true 实现的。如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,该队列将会被删除,此时数据也会丢失(队列没有了,消息也无处可存)。
队列的持久化能保证该队列本身的元数据不会因异常情况而丢失,但并不能保证内部存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。
消息持久化
消息实现持久化需要将消息的投递模式(MessageProperties 中的 deliveryMode)设置为 2,也就是 MessageDeliveryMode.PERSISTENT。
package org.springframework.amqp.core;
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
private MessageDeliveryMode() {
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
case NON_PERSISTENT:
return 1;
case PERSISTENT:
return 2;
default:
return -1;
}
}
public static MessageDeliveryMode fromInt(int modeAsNumber) {
switch(modeAsNumber) {
case 1:
return NON_PERSISTENT;
case 2:
return PERSISTENT;
default:
return null;
}
}
}
设置了队列和消息的持久化后,当 RabbitMQ 服务重启时,消息依旧存在。如果只设置队列持久化,重启之后消息会丢失。如果只设置消息的持久化,重启之后队列消失,继而消息也会丢失。因此,单单设置消息持久化而不设置队列的持久化显得毫无意义。
RabbitMQ 默认情况下会将消息视为持久化,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化。
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
//消息持久化
// message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}
将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能(随机)。写入磁盘的速度比写入内存的速度慢得不止一点点。因此,对于可靠性不是那么高的消息,可以选择不采用持久化处理,以提高整体的吞吐量。
在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
消费者接收确认
消息从RabbitMQ服务器发送到消费者手里之后,可能会有两种情况:
- 消息处理成功
- 消息处理异常
当RabbitMQ把消息发送之后,它如果立刻把消息进行删除,那么当消息没有处理成功时,消息就丢失了。所以需要有一个消费者的确认收到消息的机制来防止出现这种情况。对于这种接收消息确认的情况,RabbitMQ提供的SDK和Spring官方提供的依赖有所不同。
RabbitMQ Java Client Library
为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制(message
acknowledgement)。
RabbitMQ提供的消息接收确认的处理可以分成自动确认和手动确认。
自动确认
消费者在订阅队列时,可以指定 autoAck 参数。
当 autoAck 等于 true 时,RabbitMQ 会自动将发送出去的消息标记为确认,并从内存(或磁盘)中删除,而不管消费者是否真正消费了这些消息。自动确认模式适合于对消息可靠性要求不高的场景。
当autoAck是false的时候,就是手动确认。
手动确认
- 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功地处理了它,可以将其丢弃。
参数说明:
deliveryTag: 消息的唯一标识,它是一个单调递增的 64 位长整型值。deliveryTag 是每个通道(Channel)独立维护的,因此在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应通道上进行确认。
multiple: 是否批量确认。在某些情况下,为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认。如果值为 true,则会一次性确认所有小于或等于指定 deliveryTag 的消息。如果值为 false,则只确认当前指定的 deliveryTag 的消息。 - 否定确认: Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ 从 2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用 channel.basicReject 方法来通知 RabbitMQ 拒绝处理该消息。
参数说明:
deliveryTag: 参考 channel.basicAck。这是消息的唯一标识符。
requeue: 表示拒绝后消息的处理方式。
如果 requeue 参数设置为 true,RabbitMQ 会将这条消息重新放入队列,以便可以发送给下一个订阅的消费者。
如果 requeue 参数设置为 false,RabbitMQ 会将消息从队列中移除,不会将其发送给新的消费者。 - 否定确认: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject 命令一次只能拒绝一条消息。如果需要批量拒绝消息,可以使用 Basic.Nack 这个命令。消费者客户端可以调用 channel.basicNack 方法来实现。
参数说明:
deliveryTag: 消息的唯一标识符。
multiple: 表示是否批量拒绝消息。
如果设置为 true,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。
如果设置为 false,则仅拒绝指定的 deliveryTag 对应的单条消息。
requeue: 表示拒绝后消息的处理方式。
如果 requeue 参数设置为 true,RabbitMQ 会将这些消息重新放入队列,以便可以发送给下一个订阅的消费者。
如果 requeue 参数设置为 false,RabbitMQ 会将这些消息从队列中移除,不会将其发送给新的消费者。
Spring AMQP
Spring提供的消息接收确认则是分成了三种情况。
AcknowledgeMode.NONE
在这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会自动确认该消息,并从 RabbitMQ 队列中移除消息。如果消费者处理消息失败,消息可能会丢失。
代码
配置文件
spring:
rabbitmq:
host: IP地址
port: 端口号
username: 用户名
password: 密码
virtual-host: 虚拟主机
listener:
simple:
acknowledge-mode: none # 自动确认消息,不管成功处理与否,消息都会被删除
生产者
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");
return "消息发送成功";
}
消费者
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
//消费者逻辑
System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getDeliveryTag());
//进行业务逻辑处理
System.out.println("业务逻辑处理");
Thread.sleep(5000);
// 模拟异常,处理失败
int num = 3/0;
System.out.println("业务处理完成");
}
结果
消息发送后立刻就被删除了。
AcknowledgeMode.AUTO(默认)
这种模式下, 消费者在消息处理成功时会自动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息,并且会重新入队重新发送。
代码
配置文件
spring:
rabbitmq:
host: IP地址
port: 端口号
username: 用户名
password: 密码
virtual-host: 虚拟主机
listener:
simple:
acknowledge-mode: auto
其他代码不变。
结果
AcknowledgeMode.MANUAL
在手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息。这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
代码
配置文件
spring:
rabbitmq:
host: IP地址
port: 端口号
username: 用户名
password: 密码
virtual-host: 虚拟主机
listener:
simple:
acknowledge-mode: manual
消费者代码
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getDeliveryTag());
//进行业务逻辑处理
System.out.println("业务逻辑处理");
Thread.sleep(5000);
// 模拟异常
int num = 3/0;
System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
// 参数一:消息的deliveryTag
// 参数二:是否批量处理
// 参数三:是否确认 true->会重新发送 false->直接丢弃
System.out.println("业务处理失败");
channel.basicNack(deliveryTag, false, true);
}
}
结果
重新投递
直接丢弃
重试机制
在消息传递过程中,可能会遇到各种问题,如网络故障、服务不可用、资源不足等,这些问题可能导致消息处理失败。为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送。然而,如果错误是由程序逻辑引起的,那么多次重试也是无效的。可以设置重试次数来限制重试的次数。这里只介绍在Spring AMQP中的消息重试机制。
配置文件
spring:
rabbitmq:
host: IP地址
port: 端口号
username: 用户名
password: 密码
virtual-host: 虚拟主机
listener:
simple:
acknowledge-mode: auto # 成功确认消息,失败则不确认 只有在该模式下,消息重试机制才会生效
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时⻓为5秒
max-attempts: 3 # 最⼤重试次数(包括⾃⾝消费的⼀次)
消费者处理异常退回消息
// 如果异常被捕获,消息重新发送,即使设置了重试次数,也还是会一直发送的
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
try {
int num = 3/0;
System.out.println("业务处理完成");
channel.basicAck(deliveryTag, false);
}catch (Exception e){
channel.basicNack(deliveryTag, false, true);
}
}
消费者没有处理异常
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
int num = 3/0;
System.out.println("业务处理完成");
}
在手动确认模式下,重试次数的限制不会像在自动确认模式下那样直接生效,因为是否重试以及何时重试更多地取决于应用程序的逻辑和消费者的实现。
程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是unacked的状态, 导致消息积压。
在自动确认模式下,RabbitMQ 会在消息被投递给消费者后自动确认消息。如果消费者处理消息时抛出异常,RabbitMQ 根据配置的重试参数自动将消息重新入队,从而实现重试。重试次数和重试间隔等参数可以直接在 RabbitMQ 的配置中设定,并且 RabbitMQ 会负责执行这些重试策略。
程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失了。
TTL
TTL(Time to Live,过期时间)是指消息或队列的存活时间。RabbitMQ 允许对消息和队列设置 TTL。当消息到达其设定的存活时间后,如果尚未被消费,它将被自动清除。
网上购物, 经常会遇到⼀个场景, 当下单超过24小时还未付款, 订单会被自动取消;还有类似的, 申请退款之后, 超过7天未被处理, 则自动退款。
RabbitMQ可以对每条消息设置一个ttl,也可以为一个队列设置ttl。
声明相关队列等
//ttl
//队列未设置ttl
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(Constants.TTL_QUEUE).build();
}
//设置队列ttl 方法1
@Bean("ttlQueue2")
public Queue ttlQueue2(){
return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build(); //设置队列的ttl为20s
}
// 设置队列ttl 方法2
@Bean("ttlQueue3")
public Queue ttlQueue3(){
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 20000);
return QueueBuilder.durable(Constants.TTL_QUEUE2).withArguments(map).build(); //设置队列的ttl为20s
}
@Bean("ttlExchange")
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
生产者代码
// 消息是带ttl
@RequestMapping("/ttl")
public String ttl() {
System.out.println("ttl...");
// 如果下面两条消息都没有被消费,都在等过期时间
// 则 30秒的过期后删除后才会删除20秒的过期消息
// lambda表达式写法
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {
message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
return message;
});
// 实现类写法
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为10s
return message;
}
};
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", messagePostProcessor);
return "消息发送成功";
}
// 队列带ttl
@RequestMapping("/ttl2")
public String ttl2() {
System.out.println("ttl2...");
//发送普通消息
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");
return "消息发送成功";
}
设置队列 TTL:
当设置了队列 TTL 后,队列中所有超过过期时间的消息将被自动删除。
由于过期的消息通常位于队列的头部,RabbitMQ 可以定期扫描队列的头部来检查是否有过期消息,然后删除这些过期消息。
设置消息 TTL:当设置了消息 TTL 后,消息的过期时间是单独计算的,即使消息已经过期,它不会立即从队列中删除。
RabbitMQ 在消息即将被投递给消费者之前才会判断消息是否过期。如果消息已过期,则在投递前将其删除。能够避免频繁的队列扫描,从而提高效率。
过期时间 = min(队列有TTL,消息有TTL)
死信队列
死信(Dead Letter)是消息队列中的一种特殊消息,它指的是那些无法被正常消费或处理的消息。在消息队列系统中,如RabbitMQ,死信队列用于存储这些死信消息。
产生原因
- 消息过期: 消息在队列中存活的时间超过了设定的 TTL(存活时间)。
- 消息被拒绝: 消费者在处理消息时,可能因为消息内容错误、处理逻辑异常等原因拒绝处理该消息。如果拒绝时指定不重新入队(requeue=false),该消息也会成为死信。
- 队列满了: 当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信。
应用场景
在RabbitMQ中,死信队列是一个非常有用的特性。它可以处理异常情况下消息无法被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费死信队列中的内容来分析遇到的异常情况,进而改善和优化系统。具体应用场景包括:
- 用户支付订单后: 支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,可以使用死信队列机制。当消息消费异常时,将消息投入死信队列,由订单系统的其他消费者来监听该队列,并对数据进行处理(例如发送工单等,进行人工确认)。
- 消息重试: 将死信消息重新发送到原队列或另一个队列进行重试处理。
- 消息丢弃: 直接丢弃这些无法处理的消息,以避免它们占用系统资源。
- 日志收集: 将死信消息作为日志收集起来,用于后续分析和问题定位。
代码
声明相关队列等
import com.example.rabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DLConfig {
// 消息无法从正常交换机到正常队列,则发送到死信交换机
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.ttl(10000) // 队列ttl为10秒
.maxLength(10) // 最大长度为10
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange(){
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
}
生产者
// 这里先不让消费者消费队列,可以观察到超时和超过长度的消息就进入死信队列了
@RequestMapping("/dl")
public String dl() {
System.out.println("dl...");
//发送普通消息
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");
System.out.printf("%tc 消息发送成功 \n", new Date());
//测试队列长度
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);
}
return "消息发送成功";
}
消费者
import com.example.rabbitmqextensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 测试超时和超过队列长度时先注释该注解,不要监听normal.queue
//@Component
public class DLListener {
// 拒绝消息并不放入队列则进入死信队列
@RabbitListener(queues = Constants.NORMAL_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("[normal.queue]接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
//进行业务逻辑处理
System.out.println("业务逻辑处理");
int num = 3/0;
System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, false); //requeue为false, 该消息成为死信
}
}
}
超时+超过长度结果
消息被拒绝并未重新入队
只发送一条消息
延迟队列
延迟队列是一种消息队列,其中消息在被发送到队列后,并不会立即被消费者消费,而是等待指定的时间后再进行消费。这种机制使得消息能够在未来的某个时间点被处理,而不是立刻处理。
应用场景
智能家居场景: 用户希望通过手机远程控制家中的智能设备,在特定的时间执行任务。
解决方案: 将用户的指令发送到延迟队列,设置指令的执行时间。当时间到达时,延迟队列将指令推送到智能设备,实现用户预定的操作。
预定会议后,需要在会议开始前提醒参会人员。
解决方案: 将会议提醒消息发送到延迟队列,设定消息的延迟时间为会议开始前十五分钟。当时间到达时,提醒消息会被发送给参会人员。
用户注册:场景: 用户注册成功后,系统希望在一段时间后发送短信来提高用户活跃度。
解决方案: 将短信发送请求放入延迟队列,设置延迟时间为七天。七天后,延迟队列会将短信发送请求推送到短信服务,完成短信发送。
其他场景:场景: 延迟队列可以用于各种需要定时处理的任务,例如订单处理、定时任务调度等。
解决方案: 根据具体需求,将相应的任务消息发送到延迟队列,设置适当的延迟时间,待到指定时间再进行处理。
RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time-To-Live)和死信队列(DLX,Dead Letter Exchange)组合的方式来模拟延迟队列的功能。
队列TTL+死信队列
这种写法就和之前队列带TTL的写法一致,只不过没有消费者监听正常队列。
消息TTL+死信队列
当消息带TTL时,如果先发送ttl较大的消息,后发送ttl较小的消息,那么只有等到ttl大的那个消息被消费后ttl较小的消息才会被消费。这样就不符合定时发送的效果。我们可以通过插件来实现。
官方文档:Scheduling Messages with RabbitMQ | RabbitMQ
代码
声明相关交换机等
import com.example.rabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayConfig {
@Bean("delayQueue")
public Queue delayQueue(){
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange(){
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}
}
生产者
@RequestMapping("/delay")
public String delay() {
System.out.println("delay...");
// 先让30s过期消息发送,再发送10s延迟消息 来查看效果
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 30s...", message -> {
message.getMessageProperties().setDelay(30000); //单位: 毫秒, 过期时间为30s
return message;
});
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 10s...", message -> {
message.getMessageProperties().setDelay(10000); //单位: 毫秒, 延迟时间为10s
return message;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
消费者
import com.example.rabbitmqextensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayListener {
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void delayHandMessage(Message message, Channel channel) throws Exception {
//消费者逻辑
System.out.printf("[delay.queue] %tc 接收到消息: %s \n", new Date(), new String(message.getBody(),"UTF-8"));
}
}
观察结果
原理
插件的原理是当带有ttl的消息到达交换机时,交换机会先让其等待它的ttl时间,然后再转发给队列。这样队列是按照ttl顺序排序了。
二者对比
基于死信实现的延迟队列
优点:灵活性: 不需要额外的插件支持。利用 RabbitMQ 内置的 TTL(Time-To-Live)和 DLX(Dead Letter Exchange)功能,可以实现延迟队列,避免了对外部插件的依赖。
缺点:消息顺序问题: TTL 和 DLX 可能导致消息的顺序发生变化,因为消息在到达 DLX 队列时的顺序可能与原始顺序不同。
系统复杂性: 需要额外的逻辑来处理死信队列中的消息。这增加了系统的复杂性,可能需要在消费者端进行额外的处理,以确保消息的正确处理。
基于插件实现的延迟队列
优点:简化实现: 通过特定插件可以直接创建延迟队列,简化了延迟消息的实现过程。这些插件通常提供了直观的配置选项,使得设置延迟队列变得更加简单。
避免时序问题: 插件通常会处理好消息的时序问题,确保消息在延迟后按预期的顺序被处理,减少了与 TTL 和 DLX 相关的时序问题。
缺点:插件依赖: 需要依赖特定的插件,这意味着需要进行额外的运维工作,如安装、配置和维护插件。
版本适用性: 可能只适用于特定版本的 RabbitMQ,不同版本间可能存在兼容性问题,限制了插件的灵活性和应用范围。
事务
RabbitMQ 是基于 AMQP 协议实现的,该协议提供了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作。RabbitMQ 的事务机制允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。
代码
配置RabbitMQTemplate
// 事务
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true); //开启事务
return rabbitTemplate;
}
// 事务管理器
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
相关队列声明
@Bean("transQueue")
public Queue transQueue(){
return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
生产者
@Transactional
@RequestMapping("/trans")
public String trans(){
System.out.println("trans test...");
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 1...");
int num = 5/0;
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 2...");
return "消息发送成功";
}
观察结果
没有一条消息进来
消息分发
RabbitMQ消息分发有两个最大的作用。限流和负载均衡。
限流
在一个订单系统中,正常情况下,每秒最多处理 5000 个请求,可以满足正常的需求。然而,在秒杀活动期间,请求数量瞬间激增,每秒达到 10,000 个请求。如果这些请求全部通过消息队列(MQ)发送到订单系统,将会对订单系统造成巨大的压力,甚至可能导致系统崩溃。
为了解决这个问题,RabbitMQ 提供了限流机制,可以控制消费者一次只拉取一定数量的请求。通过设置 prefetchCount 参数,可以有效实现流控制和负载均衡。需要注意的是,为了使限流机制生效,必须将消息的应答方式设置为手动应答。
prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量,从而实现流控制和负载均衡。
消息应答方式: 必须设置为手动应答,以确保消息在被处理完成之前不会被标记为已处理。
代码
配置代码
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 5 # 每次从队列中获取消息的个数
声明相关队列等
import com.example.rabbitmqextensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QosConfig {
@Bean("qosQueue")
public Queue qosQueue(){
return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public Exchange qosExchange(){
return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
}
}
生产者代码
@RequestMapping("/qos")
public String qos() {
System.out.println("qos test...");
//发送普通消息
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test..." + i);
}
return "消息发送成功";
}
消费者代码
@Component
public class QosListener {
// 限流
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getDeliveryTag());
//肯定确认
// 注释掉,暂时让他不确认,观察结果
// channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, true);
}
}
}
负载均衡
消费快的消费者拿到更多的消息,消费慢的消费者则少拿一些消息。
代码
代码基本和上面一样。
配置代码
listener:
simple:
acknowledge-mode: manual
prefetch: 1 #这里要改成1
消费者代码
import com.example.rabbitmqextensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class QosListener {
// 限流
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getDeliveryTag());
Thread.sleep(1000);
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, true);
}
}
// 负载均衡
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handMessage2(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getDeliveryTag());
Thread.sleep(2000);
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag, false, true);
}
}
}