目录
一.高级特性
1.1消息的可靠投递
2.1Consumer Ack
3.1消费端限流
4.1TTL
5.1死信队列
6.1延迟队列
7.1日志与监控
7.1.1日志
7.1.2监控
8.1消息追踪
8.1.1Firehose
8.1.2rabbitmq_tracing
9.1消息可靠性保障(思路)
9.2消息幂等性保障(思路)
一.高级特性
1.1消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提 供了两种方式用来控制消息的投递可靠性模式。
①confirm确认模式
②return退回模式
rabbitmq 整个消息投递的路径为: producer--->rabbitmqbroker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
接基础篇,在生产者中RabttitMQConfig中配置
public static final String CONFIRM_EXCHANGE_NAME = "test_exchange_confirm";
public static final String CONFIRM_QUEUE_NAME = "test_queue_confirm";
@Bean("confirmExchange")
public Exchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding bindConfirmExchangeQueue(@Qualifier("confirmExchange") Exchange exchange,
@Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
在yml中开启
#配置RabbitMQ的基本信息
spring:
rabbitmq:
host: ip
username: admin
password: admin
port: 5672
virtual-host: /
publisher-confirms: true //开启确认模式,这个是老版本的写法,已经废弃了,新版本在后面加type
publisher-returns: true //开启退回模式
测试类
/*
*
* 确认模式步骤
* 1.确认模式开启,在yml中配置
* 2.在rabbitTemplate定义ConfirmCallback回调函数
*
* */
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关的配置信息
* @param ack exchange交换机 是否成功收到了消息,true成功,false失败
* @param cause 如果ack为false,这是失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
//接收成功
System.out.println("接收成功消息:"+cause);
}else {
//接收失败
System.out.println("接收失败消息:"+cause);
//做一些处理,再次发送
}
System.out.println("Confirm方法被执行……");
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME,"confirm","message..confirm...");
}
以上配置方法也可以写在配置类中
回退方式测试类
/**
* 回退模式:当消息发送给Exchange后,Exchange路由到Queue失败时才会执行ReturnCallback
* 1.开启回退模式
* 2.设置ReturnCallback
* 3.设置Exchange处理消息的模式
* 消息没有路由到Queue,丢弃消息
* 如果消息没有路由到Queue,返回给消息发送方
*/
@Test
public void testCallback() throws InterruptedException {
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingKey) {
System.out.println("return执行了……");
System.out.println(message);
System.out.println(replycode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME,"confirm","message..confirm...");
Thread.sleep(3000);
}
消费者确认再继续看下面
2.1Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式: 自动确认:acknowledge="none" •
手动确认:acknowledge="manual" •
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的 消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则 调用channel.basicNack()方法,让其自动重新发送消息
在消费者工程中,新建监听者
/**
* Counsumer ACK
* 1.设置手动签收 在yml
* 2.让监听器类实现ChannelAwareMessageListener
* 3.如果消息成功处理,调用channel的basicAck进行签收
* 4.basicNack拒收 broker重新发送给counsumer
*/
@Component
public class ACKListener {
@RabbitListener(queues = "test_queue_confirm")
public void ListenerQueue2(Message message,Channel channel) throws IOException {
System.out.println("test_queue_confirm----->"+message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("处理业务逻辑");
Thread.sleep(3000);
int i = 10/0;
/**
* long deliveryTag,收到消息的标识
* boolean multiple,是否签收多条消息
*/
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//拒绝签收 第三个参数设置为true,消息重新回到queue,broker重新发送该消息给消费端
// channel.basicNack(deliveryTag,true,true);
// 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
Boolean redelivered = message.getMessageProperties().getRedelivered();
System.out.println("redelivered = " + redelivered);
try {
// (已重投)拒绝确认
if (redelivered) {
/**
* 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
* boolean requeue: false不重新入队列(丢弃消息)
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
System.out.println();
} else { // (没有重投) 消息重投
/**
* 消息重投,重新把消息放回队列中
* boolean multiple: 单条或批量
* boolean requeue: true重回队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("=========消息重投了=======");
}
} catch (Exception ee) {
ee.printStackTrace();
}
}
}
}
配置yml
#配置RabbitMQ的基本信息
spring:
rabbitmq:
host: ip
username: admin
password: admin
port: 5672
virtual-host: /
listener:
simple:
# 并发消费:每个侦听器线程的最小数量,具体数值根据系统性能配置(一般为系统cpu核数)
concurrency: 2
# 并发消费:每个侦听器线程的最大数量,具体数值根据系统性能配置(一般为系统cpu核数*2)
max-concurrency: 4
# 每次只能获取一条消息,处理完成才能获取下一个消息,避免照成消息堆积在一个消费线程上
prefetch: 1
#acknowledge-mode: manual # 消费者开启手动ack消息确认,需要测试请看示例请AckConsumer,所有队列都会生效
#default-requeue-rejected: false # 设置为false,会重发消息到死信队列(防止手动ack确认失败的消息堆积),需要测试请示例AckConsumer,所有队列都会生效
retry:
enabled: true # 解决消息死循环问题-启用重试
max-attempts: 3 # 最大重试3次(默认),超过就丢失(或放到死信队列中,防止消息堆积)
multiplier: 2 # 乘子
initial-interval: 3000 # 第一次和第二次之间的重试间隔,后面的用乘子计算 3s 6s 12s
max-interval: 16000 # 最大重试时间间隔16s
direct:
acknowledge-mode: manual
default-requeue-rejected: false
小结
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
持久化 exchange要持久化
queue要持久化
message要持久化
生产方确认Confirm
消费方确认Ack
Broker高可用
3.1消费端限流
在yml中配置 prefetch属性设置消费端一次拉取多少消息
prefetch: 1
消费端的确认模式一定为手动确认。
acknowledge-mode: manual
详细配置见上面消费者的yml
4.1TTL
TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
可以直接在RabbitMQ的web端直接对队列进行设置
也可以通过java代码对其进行设置,具体代码见后面死信队列与延迟队列
小结
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断 这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
5.1死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以 被重新发送到另一个交换机,这个交换机就是DLX
消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费
正常队列绑定死信交换机: 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
生产者
RabbitMQConfig中配置一对正常交换机队列、一堆死信交换机队列、正常队列与死信交换机绑定
//死信
//先声明正常的交换机(test_exchange_dlx)和队列(test_queue_dlx)
//声明死信交换机(exchange_dlx)和队列(queue_dlx)
//正常队列绑定死信交换机
//设置两个参数
//x-dead-letter-exchange 死信交换机名称
//x-dead-letter-routing-key 发送给死信交换机的routing key
@Bean("test_exchange_dlx")
public Exchange test_exchange_dlx(){
return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();
}
@Bean("test_queue_dlx")
public Queue test_queue_dlx(){
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","exchange_dlx");
map.put("x-dead-letter-routing-key","dlx.hehe");
map.put("x-message-ttl",10000);
map.put("x-max-length",10);
return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();
}
@Bean
public Binding dlxCommonBinding(@Qualifier("test_exchange_dlx") Exchange exchange,
@Qualifier("test_queue_dlx") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
}
@Bean("exchange_dlx")
public Exchange exchange_dlx(){
return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();
}
@Bean("queue_dlx")
public Queue queue_dlx(){
return QueueBuilder.durable("queue_dlx").build();
}
@Bean
public Binding dlxBinding(@Qualifier("exchange_dlx") Exchange exchange,
@Qualifier("queue_dlx") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
}
测试类
/**
* 发送测试死信消息
* 1.过期时间
* 2.长度限制
* 3.消息拒收
*/
@Test
public void testDlx(){
//1.过期时间
// rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死亡");
//2.长度限制
// for (int i = 1; i <= 20 ; i++) {
// rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死亡");
// }
//3.消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死亡");
}
消费者监听,监听正常队列
@Component
public class DLXListener {
@RabbitListener(queues = "test_queue_dlx")
public void ListenerQueue3(Message message,Channel channel) throws IOException {
System.out.println("test_queue_dlx----->"+message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("处理业务逻辑");
Thread.sleep(3000);
int i = 10/0;
/**
* long deliveryTag,收到消息的标识
* boolean multiple,是否签收多条消息
*/
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//拒绝签收 第三个参数设置为true,消息重新回到queue,broker重新发送该消息给消费端
// channel.basicNack(deliveryTag,true,true);
// 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
Boolean redelivered = message.getMessageProperties().getRedelivered();
System.out.println("redelivered = " + redelivered);
try {
// (已重投)拒绝确认
if (redelivered) {
System.out.println("拒绝了,不重投");
/**
* 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
* boolean requeue: false不重新入队列(丢弃消息)
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
System.out.println();
} else { // (没有重投) 消息重投
/**
* 消息重投,重新把消息放回队列中
* boolean multiple: 单条或批量
* boolean requeue: true重回队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("=========消息拒绝接收,重投了=======");
}
} catch (Exception ee) {
ee.printStackTrace();
}
}
}
}
观察死信队列中消息数量变化
6.1延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1. 下单后,30分钟未支付,取消订单,回滚库存。
2. 新用户注册成功7天后,发送短信问候。
实现方式: 1. 定时器 2. 延迟队列
定时器对于性能消耗很大,所以采取延迟队列
由于RabbitMQ没有直接的延迟队列,故而采取ttl+死信队列来实现
生产者
RabbitMQConfig中配置
//延迟
//先声明正常的交换机(order_exchange)和队列(order_queue)
//声明死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
//正常队列绑定死信交换机
//设置两个参数
//x-dead-letter-exchange 死信交换机名称
//x-dead-letter-routing-key 发送给死信交换机的routing key
@Bean("order_exchange")
public Exchange order_exchange(){
return ExchangeBuilder.topicExchange("order_exchange").durable(true).build();
}
@Bean("order_queue")
public Queue order_queue(){
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","order_exchange_dlx");
map.put("x-dead-letter-routing-key","dlx.order.cancel");
map.put("x-message-ttl",10000);
map.put("x-max-length",10);
return QueueBuilder.durable("order_queue").withArguments(map).build();
}
@Bean
public Binding yanchiCommonBinding(@Qualifier("order_exchange") Exchange exchange,
@Qualifier("order_queue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
@Bean("order_exchange_dlx")
public Exchange order_exchange_dlx(){
return ExchangeBuilder.topicExchange("order_exchange_dlx").durable(true).build();
}
@Bean("order_queue_dlx")
public Queue order_queue_dlx(){
return QueueBuilder.durable("order_queue_dlx").build();
}
@Bean
public Binding yanchiBinding(@Qualifier("order_exchange_dlx") Exchange exchange,
@Qualifier("order_queue_dlx") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
}
测试类
@Test
public void testDelay() throws InterruptedException {
//发送订单消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息 id:1");
//打印倒计时10s
for (int i = 0; i < 10; i++) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
消费者监听,监听的是死信队列
@Component
public class OrderListener {
@RabbitListener(queues = "order_queue_dlx")
public void ListenerQueue4(Message message,Channel channel) throws IOException {
System.out.println("order_queue_dlx----->"+message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("处理业务逻辑");
Thread.sleep(3000);
System.out.println("根据订单id查询状态");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚……");
/**
* long deliveryTag,收到消息的标识
* boolean multiple,是否签收多条消息
*/
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//拒绝签收 第三个参数设置为true,消息重新回到queue,broker重新发送该消息给消费端
// channel.basicNack(deliveryTag,true,true);
// 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
Boolean redelivered = message.getMessageProperties().getRedelivered();
System.out.println("redelivered = " + redelivered);
try {
// (已重投)拒绝确认
if (redelivered) {
System.out.println("拒绝了,不重投");
/**
* 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
* boolean requeue: false不重新入队列(丢弃消息)
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
System.out.println();
} else { // (没有重投) 消息重投
/**
* 消息重投,重新把消息放回队列中
* boolean multiple: 单条或批量
* boolean requeue: true重回队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("=========消息拒绝接收,重投了=======");
}
} catch (Exception ee) {
ee.printStackTrace();
}
}
}
}
小结
1.延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
7.1日志与监控
7.1.1日志
RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、 RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等
7.1.2监控
web管控台监控
rabbitmqctl管理和监控
8.1消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能 是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也 有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪
8.1.1Firehose
firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式 发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类 型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on:开启Firehose命令
rabbitmqctl trace_off:关闭Firehose命令
步骤:
1.创建一个新的队列,然后将这个队列绑定到默认交换机上,Routing Key写#,表示所有消息都监听
2.然后在这个新的队列发一条消息,然后再查,发现只有一条
3.然后开启rabbitmqctl trace_on
4.再发一条消息,再查,发现有额外的消息
8.1.2rabbitmq_tracing
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一 层GUI的包装,更容易使用和管理。
启用插件:rabbitmq-plugins enable rabbitmq_tracing
然后即可在web端右侧看到Tracing,点进去即可查看与操作,可以创建一个新的,Pattern写#
9.1消息可靠性保障(思路)
由于比较复杂,只提供一个思路,思路多种多样,这个不一定好,看看就好
9.2消息幂等性保障(思路)
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。
也就是说,其任 意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
比如发送两条相同的扣款信息,不能扣两次款。
采取乐观锁机制。
本文只是本人在学习复习过程中的笔记,有什么不足之处请指教。