文章目录
- 一、消息确认机制
- 🎉1.1 消息发送确认(生产者)
- 🔹confirm 确认模式
- 🔹return 回退模式
- 🚩1.2 消息接收确认(消费者)
- 🔸none 自动确认
- 🔸auto 异常确认
- 🔸manual 手动确认
- 二、消费端限流 (prefetch)
- 三、设置队列参数
- 🍃3.1 消息TTL过期
- 🍃3.2 队列最大长度
- 四、死信队列
- 🍒4.1 构建死信队列
- 🍒4.2 模拟死信消息
- 🍒4.3 实现延迟队列
- 五、消息追踪
- 🍀5.1 Firehose: amq.rabbitmq.trace
- 🍀5.2 rabbitmq_tracing 插件
- 六、应用问题
- 🌹6.1 消息可靠性保障
- 🔸消息补偿机制
- 🌹6.2 消息幂等性保障
- 🔹乐观锁解决方案
提示:以下是本篇文章正文内容,RabbitMQ 系列学习将会持续更新
官网:https://www.rabbitmq.com
一、消息确认机制
RabbitMQ 消息确定主要分为两部分:
- 第一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。
- 确认发送的第一步是确认是否到达交换器。
- 确认发送的第二步是确认是否到达队列。
- 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
🎉1.1 消息发送确认(生产者)
- 消息从 producer 到 exchange 则会返回一个
confirmCallback
。 - 消息从 exchange 到 queue 投递失败则会返回一个
returnCallback
。
🔹confirm 确认模式
①开启确认模式
spring:
rabbitmq:
addresses: 1.15.76.95
username: admin
password: 123456
virtual-host: /test
# 消息可靠传递: 开启确认模式
publisher-confirm-type: correlated
publisher-confirm-type: none
:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到 Broker 都不会触发 ConfirmCallback 回调。publisher-confirm-type: correlated
:表示消息成功到达 Broker 后触发 ConfirmCalllBack 回调。publisher-confirm-type: simple
:如果消息成功到达 Broker 后一样会触发 ConfirmCalllBack 回调,发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑。
注意:waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel 信道,则接下来无法发送消息到 broker。
②设置 ConfirmCallback
函数,然后发送消息。
@Test
void publisher1() {
// 1.定义回调
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* confirm方法参数:
* @param correlationData 相关配置信息
* @param ack 交换机是否成功收到消息
* @param cause 交换机接收失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行confirm()方法: ");
if(ack) {
System.out.println("交换机成功收到消息!");
}else {
System.out.println("交换机没有收到消息,失败原因: " + cause);
}
}
});
// 2.发送消息
template.convertAndSend("amq.direct", "my-yyds", "Hello,World");
// 休眠一会儿
Thread.sleep(10000);
}
③运行测试:
a. 我们成功发送了消息,但是控制台依然返回了 ack=false。
原因:当发送方法结束,RabbitMQ 相关的资源也就关闭了。虽然我们的消息发送出去,但异步的 ConfirmCallback 却由于资源关闭无法返回确认信息。
b. 我们可以让主线程休眠一会儿,等 callback 返回确认信息后再关闭资源。
回到目录…
🔹return 回退模式
①开启回退模式
spring:
rabbitmq:
addresses: 1.15.76.95
username: admin
password: 123456
virtual-host: /test
# 消息可靠传递: 开启确认模式
publisher-confirm-type: correlated
# 消息可靠传递: 开启回退模式
publisher-returns: true
②设置 Exchange 处理消息的模式:如果消息没有路由到 Queue
- 默认方式:丢弃消息 。
- 如果设置了
rabbitTemplate.setMandatory(true)
参数,则会将消息退回给 producer。并执行回调函数 returnedMessage。
③设置 ReturnCallBack
函数
@Test
void publisher2() throws InterruptedException {
// 1.设置交换机处理失败消息的模式
template.setMandatory(true);
// 2.设置ReturnCallBack函数
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息对象: " + returnedMessage.getMessage());
System.out.println("错误码: " + returnedMessage.getReplyCode());
System.out.println("错误信息: " + returnedMessage.getReplyText());
System.out.println("交换机: " + returnedMessage.getExchange());
System.out.println("路由键: " + returnedMessage.getRoutingKey());
}
});
// 3.发送消息
template.convertAndSend("amq.direct", "my-yyds1", "Hello,World");
Thread.sleep(10000);
}
④运行测试:可以看到路由错误。
回到目录…
🚩1.2 消息接收确认(消费者)
🔸none 自动确认
acknowledge-mode=none
,默认方式,默认消费者能正确处理所有请求。
🔸auto 异常确认
acknowledge-mode=auto
,根据异常情况决定处理方式。
- 如果消费者在消费的过程中没有抛出异常,则自动确认。
- 当消费者消费的过程中抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且该消息不会重回队列。
- 当抛出 ImmediateAcknowledgeAmqpException 异常,消息会被确认。
- 如果抛出其他的异常,则消息会被拒绝,但是与前两个不同的是,该消息会重回队列,如果此时只有一个消费者监听该队列,那么该消息重回队列后又会推送给该消费者,会造成死循环的情况。
🔸manual 手动确认
- 在该模式下,消费者消费消息后需要根据消费情况给 Broker 返回一个回执,是确认
ack
使 Broker 删除该条已消费的消息,还是失败确认返回nack
,还是拒绝该消息。 - 开启手动确认后,消费者接收到消息后必须返回
ack
,只有 RabbitMQ 接收到ack
后,消息才会从队列中被删除。如果不调用 channel.basicAck() 会出现该消息被重复消费的情况。
①设置手动签收,acknowledge =manual
spring:
rabbitmq:
addresses: 1.15.76.95
username: admin
password: 123456
virtual-host: /test
listener:
# 设置监听器类型,如不设置将会默认为SimpleRabbitListenerContainerFactory,那么下面的direct配置不生效
type: direct
direct:
# 设置消费端手动确认
acknowledge-mode: manual
②监听器类实现 ChannelAwareMessageListener
接口
- 目的是为了实现它的 onMessage(Message message, Channel channel) 方法,因为我们需要 channel 连接器。
- 也可以不实现接口,直接在定义接收方法时,引入 Channel 参数。
③如果消息成功处理,则调用 channel.basicAck()
签收。
④如果消息处理失败,则调用 channel.basicNack()
拒绝签收,broker 重新发送给 consumer。
@Component
public class AckListener {
@RabbitListener(queues = "yyds")
public void onMessage(Message message, Channel channel) throws Exception {
// 先获取当前的消息标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1.接收消息
System.out.println(new String(message.getBody()));
// 2.处理业务逻辑
System.out.println("处理业务逻辑...");
//int a = 3 / 0; //模拟出错的场景
// 3.手动签收
channel.basicAck(deliveryTag, false);
}catch(Exception e) {
// 4.拒绝签收
System.out.println("消息接收失败,让消息重回队列");
channel.basicNack(deliveryTag, false, true);
}
}
}
消息确认的方法 | 参数一 | 参数二 | 参数三 | 描述 |
---|---|---|---|---|
channel.basicAck(deliverTag, true) | 消息标签 | 是否接收多条消息 | ——— | 确认应答 |
channel.basicNack(deliverTag, false, true) | 消息标签 | 是否接收多条消息 | 是否让消息重回队列 | 拒绝应答 |
channel.basicReject(deliverTag, false) | 消息标签 | 是否让消息重回队列 | ——— | 拒绝消息 |
channel.basicRecover(true) | 是否恢复给其它消费者 | ——— | ——— | 恢复消息 |
⑤启动测试:
如果接收者成功消费消息,则没有任何返回。
如果接收者消费消息失败,则采用 Nack 让消息重回队列,并且自己可以再次消费重回后的消息。
回到目录…
二、消费端限流 (prefetch)
和单纯 ACK 区别:ACK 会一次拉取所有消息,而 prefetch 可以设置每次消费的消息数量
- 用于指定消费端处理消息的速率。
- 用于保证系统稳定性,削峰填谷时的处理速率。
①该功能必须设置 ACK 方式:手动确认 acknowledge-mode: manual
且 prefetch=条数
spring:
rabbitmq:
listener:
type: direct
direct:
acknowledge-mode: manual #手动确认
prefetch: 3 #消费者每次消费的数量
②我们先批量插入20条消息
@Test
void publisher() {
for (int i = 0; i < 20; i++) {
template.convertAndSend("amq.direct", "my-yyds", "Hello,World: " + i);
}
}
③设置监听器:消费者手动确认消息
@Component
public class QosListener {
@RabbitListener(queues = "yyds")
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
// 1.接收消息
System.out.println("接收消息: " + new String(message.getBody()));
// 2.处理业务
System.out.println("处理业务中...");
Thread.sleep(1000);
// 3.签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //设置参数可以签收多条消息
}
}
④启动测试:可以看到我们接收到的消息是3条/批
回到目录…
三、设置队列参数
🍃3.1 消息TTL过期
RabbitMQ 支持将超过一定时间没被消费的消息自动删除,这需要消息设定 TTL
值,如果消息的存活时间超过了 Time To Live
值,就会被自动删除。如果有死信队列,那么就会进入到死信队列中。
①直接给队列设定 TTL 值(毫秒为单位): 过期会删除队列中的所有消息。
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("dl-yyds")
.ttl(10000) //如果10秒没处理,就自动删除
.build();
}
现在我们删除之前的 yyds 队列再重启测试:可以发现 yyds 队列已经具有 TTL 特性了。
②生产者给某个消息设定过期时间: 使用 expiration
参数。只有消息在双端时,才是计时。
@Test
void publisher() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000"); //设置过期时间 ms
return message;
}
};
template.convertAndSend("amq.direct", "my-yyds", "Hello,TLL", messagePostProcessor);
}
现在删除之前的 yyds 队列再重启测试:发现队列并没有 TTL 特性。
只是我们发布的消息会过期,不影响其它消息。
这种情况就不会过期:当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
回到目录…
🍃3.2 队列最大长度
我们来看一下当消息队列长度达到最大的情况,现在我们将消息队列的长度进行限制:
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.maxLength(3) //将最大长度设定为3
.build();
}
现在我们重启一下:可以发现 yyds 队列已经具有 Limit 特性了。
我们向 yyds 队列中依次插入4条消息:person-1
、person-2
、person-3
、person-4
可以看到因为长度限制为3,所以最开始的消息直接被丢弃了。
回到目录…
四、死信队列
使用场景:
● 如果消息队列中的数据迟迟没有消费者来处理,那么就会一直占用消息队列的空间。
● 比如我们模拟一下抢车票的场景,用户下单高铁票之后,会进行抢座,然后再进行付款,但是如果用户下单之后并没有及时的付款,这张票不可能一直让这个用户占用着,因为你不买别人还要买呢,所以会在一段时间后超时,让这张票可以继续被其他人购买。
● 这时,我们就可以使用死信队列,将那些用户超时未付款的或是用户主动取消的订单,进行进一步的处理。
消息成为死信的三种情况:
● 消息被拒绝 (basic.reject
/ basic.nack
),并且 requeue = false
不重回队列
● 消息TTL过期
● 队列达到最大长度
那么如何构建这样的模式呢?
实际上本质就是一个死信交换机 + 绑定的死信队列。当正常队列中的消息被判定为死信时,会被发送到对应的死信交换机,然后再通过交换机发送到死信队列中,死信队列也有对应的消费者去处理消息。
🍒4.1 构建死信队列
①这里我们直接在配置类中创建一个新的死信交换机和死信队列,并进行绑定:
@Configuration
public class RabbitConfiguration {
@Bean("directDlExchange") //创建一个新的死信交换机
public Exchange dlExchange() {
return ExchangeBuilder.directExchange("dlx.direct").build();
}
@Bean("yydsDlQueue") //创建一个新的死信队列
public Queue dlQueue() {
return QueueBuilder
.nonDurable("dl-yyds") //队列名称
.build();
}
@Bean("dlBinding") //死信交换机和死信队列进绑定
public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,
@Qualifier("yydsDlQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dl-yyds") //自定义routingKey
.noargs();
}
@Bean("directExchange")
.......................
@Bean("yydsQueue") //在普通消息队列中指定死信交换机
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct") //指定死信交换机
.deadLetterRoutingKey("dl-yyds") //指定死信RoutingKey
.build();
}
@Bean("binding")
................
}
②接着我们将监听器修改为死信队列监听:
@Component
public class ConsumeListener {
@RabbitListener(queues = "dl-yyds", messageConverter = "jacksonConverter")
public void receiver(User user){
System.out.println("死信队列监听器: " + user);
}
}
启动一下:我们可以看到多了一个死信交换机。
队列列表中也多了一个死信队列,并且 yyds 队列也支持死信队列发送功能了。
回到目录…
🍒4.2 模拟死信消息
场景一:消息被拒绝
现在我们先向 yyds 队列中发送一个消息:Hello,World
然后我们取消息的时候拒绝消息,并且不让消息重新排队:
可以看到拒绝后,如果不让消息重新排队,那么就会直接被丢进死信队列中:
场景二:消息TTL过期
如果消息的存活时间超过了 Time To Live
值,就会被自动删除。如果有死信队列,那么就会进入到死信队列中。
现在我们将 yyds 消息队列设定 TTL 值(毫秒为单位):
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("dl-yyds")
.ttl(10000) //如果10秒没处理,就自动删除
.build();
}
启动测试:我们可以看到 死信队列 和 普通队列 (TTL
、DLX
死信交换机、DLK
死信routingKey):
我们向 yyds 队列中插入一个新的消息:Hello,World
可以看到消息10秒钟之后就不见了,而是被丢进了死信队列中。
场景三:队列达到最大长度
最后我们来看一下当消息队列长度达到最大的情况,现在我们将消息队列的长度进行限制:
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("dl-yyds")
.maxLength(3) //将最大长度设定为3
.build();
}
启动测试:我们可以看到 死信队列 和 普通队列(Lim
、DLX
、DLK
):
我们向 yyds 队列中依次插入4条消息:Message-1
、Message-2
、Message-3
、Message-4
可以看到因为长度限制为3,所以最开始的消息直接被丢进了死信队列中了。
回到目录…
🍒4.3 实现延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
场景一:下单后,30分钟未支付,取消订单,回滚库存。
场景二:新用户注册成功7天后,发送短信问候。
我们可以使用 TTL+死信队列
组合实现延迟队列的效果。
这里就不演示 TTL+死信队列
的组合了,和上面一样,我们来实现一下消费者端吧!
@Component
public class DlxListener {
@RabbitListener(queues = "dl-yyds")
public void receiver(Message message, Channel channel) throws Exception {
try {
System.out.println("死信队列监听器: " + new String(message.getBody()));
System.out.println("======== 业务处理中 ==========");
System.out.println("根据订单id查询状态:");
System.out.println(" 如果订单支付成功,向物流系统发送发货的请求。");
System.out.println(" 如果订单支付失败,取消订单,回滚库存。");
System.out.println("======== 确认消息签收 ==========");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e) {
System.out.println("出现异常,拒绝签收, 让消息重回队列");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
启动测试:
手动向 yyds 队列中发一条消息:订单:{id:1, 状态:xxx, 下单时间: yyyy-m-d HH:MM:SS}
消息过期后,成功被死信队列监听到。
回到目录…
五、消息追踪
🍀5.1 Firehose: amq.rabbitmq.trace
- Firehose 的机制是将生产者投递给 RabbitMQ 的消息,RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的 Exchange 上。
- 这个默认的 Exchange 的名称为
amq.rabbitmq.trace
,它是一个 topic 类型的内部交换机。 - 发送到该交换机上的消息的 routingKey 为
publish.exchangename
(生产者发布的消息) 和deliver.queuename
(消费者获取的消息)。
应用:amq.rabbitmq.trace 交换机实现消息追踪
可以看到它也是 topic
类型的,它是一个内部交换机,用于帮助我们记录和追踪生产者和消费者使用消息队列的交换机。
①首先,我们需要在控制台将虚拟主机 /test
的追踪功能开启:
rabbitmqctl trace_on -p /test
②创建一个 trace 消息队列用于接收记录:
③我们给 amq.rabbitmq.trace 交换机绑定上刚刚的队列: 因为该交换机是内部的,所以只能在 Web 管理页面中绑定
由于发送到此交换机上的 routingKey 为 publish.交换机名称
和 deliver.队列名称
,分别对应生产者投递到交换机的消息,和消费者从队列上获取的消息,因此这里使用 #
通配符进行绑定。
④现在我们来测试一下,往 yyds 队列中发送消息: 会发现 trace 队列中多了2条信息。
通过追踪,我们可以很明确地得知消息发送的交换机、routingKey、用户等信息,包括信息本身:
同样的,消费者在取出数据时也有记录:我们可以明确消费者的地址、端口、具体操作的队列以及取出的消息信息等。
回到目录…
🍀5.2 rabbitmq_tracing 插件
rabbitmq_tracing 和 Firehose 在实现上如出一辙,只不过 rabbitmq_tracing 的方式比 Firehose 多了一
层GUI
的包装,更容易使用和管理。
缺点:会降低 RabbitMQ 的整体性能,不适用于生产开发中,适用于测试和调试阶段。
①启用插件
# 查看插件列表
rabbitmq-plugins list
# 启动插件
rabbitmq-plugins enable rabbitmq_tracing
②创建 Trace 追踪队列
同时多了一条队列,点击查看发现:实际上它也是绑定了我们的 amq.rabbitmq.trace
交换机。
③测试:向 yyds 队列中发布并消费消息,然后查看 Trace log
日志 (消息的生产者和消费者)
回到目录…
六、应用问题
🌹6.1 消息可靠性保障
🔸需求:确保消息 100% 发送成功。
🔸消息补偿机制
🌹6.2 消息幂等性保障
🔹需求:在MQ中,对于相同的消息消费一次和消费多次,最终的结果一致。
🔹乐观锁解决方案
回到目录…
总结:
提示:这里对文章进行总结:
本文是对RabbitMQ高级特性的学习,我们首先学习了消息确认机制和消费端限流的方法,又通过设置队列的参数实现了死信队列和延时队列。后面又介绍了两种方式实现消息追踪,最后也介绍了消息可靠性和消息幂等性的解决方案。之后的学习内容将持续更新!!!