RabbitMQ快速入门之进阶
进阶
- RabbitMQ快速入门之进阶
- 1、confirm 模式的设置
- 2、return 退回模式的处理
- 3、消费者 Ack,手动确认
- 4、消费端限流 (流量削缝)
- 5、TTL存活时间过期时间
- 6、死信队列DLX
- 7、延迟队列 (TTL + DLX)
1、confirm 模式的设置
*confirm 确认模式
*return 退回模式
- 消息从生产者到交换机会返回一个confirmCallback
- 消息从交换机 -->队列投递失败会返回一个returnCallback
1、confirm 模式的设置
注意:本文使用的是springboot来进行处理的,springboot配置RabbitMQ可去观看:RabbitMQ快速入门
1、配置文件 publisher-confirm-type: correlated
2、在rabbitTemplate上定义ConfirmCallBack回调函数
confirm中的参数:
/**
* @param correlationData 相关的配置信息
* @param ack 交换机是否成功收到消息 true:成功
* @param cause 失败的原因,用ack来判断,来进行输出错误信息
*/
配置文件:
spring:
rabbitmq:
host: 192.168.92.134
port: 5672
username: zww
password: 123456
virtual-host: /
# Add this sentence
publisher-confirm-type: correlated
1.1生产者配置:
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "boot_queue_confirm";
public static final String EXCHANGE_NAME="boot_exchange_confirm";
// 1、交换机的配置
@Bean("bootExchange")
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 2、队列的配置
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 3、绑定关系 Binging
/**
* 1、知道哪个队列
* 2、知道哪个交换机
* 3、设置对应的 routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
1.2生产者测试模块
/**
* 确认模式
* 1、配置文件 publisher-confirm-type: correlated
* 2、在rabbitTemplate上定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关的配置信息
* @param ack 交换机是否成功收到消息 true:成功
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
// 接收成功
if (ack){
System.out.println("消息接收成功");
}else {
// 接收失败(把交换机的名字,改成不存在的,然后在运行)
System.out.println("接收失败,原因:"+cause );
// 对于失败,做一些处理
}
}
});
// EXCHANGE_NAME
rabbitTemplate.convertAndSend(EXCHANGE_NAME,"boot.confirm","传的信息:message confirm");
}
1.3:运行结果(把交换机名称换成不存在的,打印错误信息)
2、return 退回模式的处理
2、return 退回模式的处理
* 回退模式:当消息发送给Exchange后,路由器路由到Queue失败之后,才会执行ReruenCallBack
* 1、开启回退模式
* publisher-returns: true
* 2、设置ReturnCallBack
* 3、设置Exchange处理消息的模式
* 3.1、如果消息没有路由到Queue,则丢弃(默认)
* 3.2、如果消息没有路由到Queue,返回给消息发送方
* rabbitTemplate.setMandatory(true);
2.1、生产者测试模块
/**
* 回退模式:当消息发送给Exchange后,路由器路由到Queue失败之后,才会执行ReruenCallBack
* 1、开启回退模式
* publisher-returns: true
* 2、设置ReturnCallBack
* 3、设置Exchange处理消息的模式
* 3.1、如果消息没有路由到Queue,则丢弃(默认)
* 3.2、如果消息没有路由到Queue,返回给消息发送方
* rabbitTemplate.setMandatory(true);
*/
@Test
public void testReturn(){
// 设置交换机处理失败消息模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
/**
* 消息失败了,会调用这个方法
* 注意:要进行处理失败的处理 rabbitTemplate.setMandatory(true);
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
// 失败的话,就要进行想对应的处理
}
});
// 这里的key填的是错误的,这样就不能到Queue
rabbitTemplate.convertAndSend(EXCHANGE_NAME,"boot1.confirm","传的信息:message confirm");
}
2.2、运行结果
这里我们把错误的对应信息都给打印出来了。
3、消费者 Ack,手动确认
3、消费者 Ack
* Consumer Ack: 消费端收到消息之后的确认方式
* 1、自动确认 acknowledge = "none"
* 2、手动确认 acknowledge = "manual"
* 3、根据异常情况确认: acknowledge = "auto"
3.1、ACK机制:
ACK机制:
1、设置手动签收。配置文件中添加:
listener:
direct:
acknowledge-mode: manual
2、@RabbitListener注解中,要进行ackMode = "MANUAL"的设置
3、监听器类实现: ChannelAwareMessageListener接口
4、处理成功,调用channel的basicNack()签收
5、如果消息处理失败,调用channel的basicNack()拒绝签收
3.2 消费者类
/**
* 没有出现异常就使用basicAck来进行手动签收
* 出现异常就进行basicNack来进行重新尝试
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
/**
* ackMode = "MANUAL"要进行这个的设置,不然就还是自动的,那么就会有冲突,会出现错误
* @param message
* @param channel
* @throws Exception
*/
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "boot_queue_confirm"
,ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1、接收消息
System.out.println(message.getBody().toString());
// 2、处理业务逻辑
System.out.println("处理业务逻辑");
// 失败的处理
int i = 3/0;
// 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 拒接签收 第三个参数:requeue:重回队列,设置成true,那么消息会回到queue,会重新发送消息给消费端。
// 出错的话,就会一直进行执行,重新尝试
channel.basicNack(deliveryTag,true,true);
}
}
}
3.3 运行结果
运行int i = 3/0; 会一直出错,然后就会进行报错的处理
4、消费端限流 (流量削缝)
4、消费端限流
当我们处理信息的A系统每秒只能处理1000个请求,但是用户真正的请求每秒可以达到5000次,为了防止A系统宕机,我们先把数据存放在MQ中,然后A系统每秒从MQ中拉取1000个来进行处理。
4.1、概述
1、配置文件中要设置成手动: acknowledge-mode: manual
2、配置文件中要设置拉取条数: prefetch: 1
3、消费者进行签收处理:channel.basicAck
4.2、配置文件
spring:
rabbitmq:
host: 192.168.92.134
port: 5672
username: zww
password: 123456
listener:
direct:
acknowledge-mode: manual
prefetch: 1
4.3、生产者发送10条数据
/**
* 发送10条消息的处理
*/
@Test
public void testReturn1(){
// 设置交换机处理失败消息模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
/**
* 消息失败了,会调用这个方法
* 注意:要进行处理失败的处理 rabbitTemplate.setMandatory(true);
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 失败的话,就要进行想对应的处理
}
});
// 这里的key填的是错误的,这样就不能到Queue
// 10条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_NAME,"boot.confirm","传的信息:message confirm,第"+i);
}
}
4.4、消费者
/**
* 服务质量保证
* 限流机制: Consumer
* 1、ack设置为手动确认
* 2、listener-container配置属性
* prefetch = 1,表示消费端每次从mq拉取1条消息进行消费,消费完毕之后,在拉取下一条
* prefetch: 1
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
/**
* ackMode = "MANUAL"要进行这个的设置,不然就还是自动的,那么就会有冲突,会出现错误
* @param message
* @param channel
* @throws Exception
*/
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "boot_queue_confirm"
,ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
System.out.println("使用prefetch获取消息为: "+new String(message.getBody()));
// 签收处理(如果没有签收,那么就只能消费一个信息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
4.5、结果
一个一个的进行接收处理:
5、TTL存活时间过期时间
5、TTL存活时间过期时间
1、把队列设置成带有ttl时间的队列,对整个队列消息统一过期
2、消息的单独过期处理,当消息在队列头部时,会单独判断这一消息是否过期
3、如果两者都进行设置了,会选择时间短的那个
5.1、生产者
配置文件:
@Configuration
public class RabbitMQConfigTTL {
public static final String QUEUE_NAME = "boot_queue_confirm";
public static final String EXCHANGE_NAME="boot_exchange_confirm";
// 1、交换机的配置
@Bean("bootExchange")
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 2、队列的配置
@Bean("bootQueue")
public Queue bootQueue(){
// 消息10s之后,会自动过期
return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
}
// 3、绑定关系 Binging
/**
* 1、知道哪个队列
* 2、知道哪个交换机
* 3、设置对应的 routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
生产者无单独过期处理:
@Test
public void testReturnttl(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_NAME,"boot.confirm","ttl的处理:"+i);
}
}
生产者队列过期处理:
@Test
public void testReturnttl(){
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 1、设置message的信息(消息的过期时间)
message.getMessageProperties().setExpiration("5000");
// 2、返回该消息
return message;
}
};
// 消息单独过期
rabbitTemplate.convertAndSend(EXCHANGE_NAME,"boot.confirm","ttl的处理:",messagePostProcessor);
}
}
5.2、显示结果
6、死信队列DLX
6、死信队列
6.1、消息成为死信队列的三种情况
1、队列消息长度达到限制
2、消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue = false;
3、原队列存在消息过期设置,消息到达超时时间未被消费
6.2、实现以下
生产者 --> 正常交换机 —>正常队列(消息死信后)---->死信交换机---->死信队列 ---->消费者
6.3分析:
// 正常队列和交换机
public static final String QUEUE_NAME = "boot_queue";
public static final String EXCHANGE_NAME="boot_exchange";
// 配置死信队列和死信交换机
public static final String QUEUE_NAME_DLX = "boot_queue_dlx";
public static final String EXCHANGE_NAME_DLX="boot_exchange_dlx";
配置:
两个交换机正常配置,死信队列正常配置,正常队列配置时需要和死信交换机绑定。
绑定:
交换机和队列进行绑定。
队列核心配置:
Map<String, Object> args = new HashMap<>(4);
// 1、死信交换机名称
args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
// 2、死信交换机的key
args.put("x-dead-letter-routing-key", "boot.dlx.hh");
// 3、设置队列的过期时间
args.put("x-message-ttl", 10000);
// 4、设置队列的长度限制
args.put("x-max-length",10);
生产者配置文件:
spring:
rabbitmq:
host: 192.168.92.134
port: 5672
username: zww
password: 123456
virtual-host: /
publisher-confirm-type: correlated
listener:
direct:
acknowledge-mode: manual
publisher-returns: true
生产者配置类:
@Configuration
public class RabbitMQConfigDLX {
// 正常队列和交换机
public static final String QUEUE_NAME = "boot_queue";
public static final String EXCHANGE_NAME="boot_exchange";
// 配置死信队列和死信交换机
public static final String QUEUE_NAME_DLX = "boot_queue_dlx";
public static final String EXCHANGE_NAME_DLX="boot_exchange_dlx";
// 1、交换机的配置
@Bean("bootExchange")
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 死信交换机
@Bean("ExchangeDLX")
public Exchange exchangeDlx(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
}
// 2、正常队列和死信交换机
@Bean("bootQueue")
public Queue bootQueue(){
Map<String, Object> args = new HashMap<>(4);
// 1、死信交换机名称
args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
// 2、死信交换机的key
args.put("x-dead-letter-routing-key", "boot.dlx.hh");
// 3、设置队列的过期时间
args.put("x-message-ttl", 10000);
// 4、设置队列的长度限制
args.put("x-max-length",10);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
// 死信队列配置
@Bean("bootQueueDLX")
public Queue bootQueueDLX(){
return QueueBuilder.durable(QUEUE_NAME_DLX).build();
}
/**
* 正常交换机绑定正常队列
* 1、知道哪个队列
* 2、知道哪个交换机
* 3、设置对应的 routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.dlx.#").noargs();
}
// 死信交换机,和死信队列
@Bean
public Binding bindQueueExchangeDlx(@Qualifier("bootQueueDLX") Queue queue,@Qualifier("ExchangeDLX") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.dlx.#").noargs();
}
}
生产者测试类:(测试3种死信)
@Test
public void test10(){
// 1、测试过期时间 (10s,之后会跑到死信队列中)
rabbitTemplate.convertAndSend("boot_exchange","boot.dlx.hh","死信测试:");
// 2、测试长度,有10个消息在正常队列,有10个在死信队列
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("boot_exchange","boot.dlx.hh","死信测试:"+i);
}
// 3、消息拒收 (让消费者不接受,)
rabbitTemplate.convertAndSend("boot_exchange","boot.dlx.hh","死信测试:");
}
消费者处理(接收的队列为死信):
@Component
public class DLXListener implements ChannelAwareMessageListener {
/**
* ackMode = "MANUAL"要进行这个的设置,不然就还是自动的,那么就会有冲突,会出现错误
* @param message
* @param channel
* @throws Exception
*/
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "boot_queue_dlx"
,ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1、接收消息
System.out.println("获取到: "+new String(message.getBody()));
// 2、处理业务逻辑
System.out.println("处理业务逻辑");
// 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 拒接签收 第三个参数:requeue:重回队列,设置成true,那么消息会回到queue,会重新发送消息给消费端。
System.out.println("出现异常,拒绝接收。。不重回队列");
// 出错的话,就会一直进行执行,重新尝试
channel.basicNack(deliveryTag,true,false);
}
}
}
运行结果测试:
1、测试过期时间 (10s,之后会跑到死信队列中)
10s之后(死信队列中多一条):
2、测试长度,有10个消息在正常队列,有10个在死信队列
3、消息拒收 (让消费者不接受,消费者出现错误然后不重回队列)
7、延迟队列 (TTL + DLX)
7、延迟队列
7.1、概述
* 延迟队列:消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费
* 处理:TTL + 死信队列
* 1、下单后,30min未支付,取消订单,回滚库存
* 2、新用户注册成功7min后,发短信问候
流程: 订单系统->MQ延迟队列(30min后消费)->库存系统->判断订单状态->支付,未支付
MQ延迟队列:
交换机 ->TTL队列->死信交换机->死信队列->库存系统
7.2、生产者配置类
/**
* 延迟队列:
* 1、正常交换机和正常队列
* 2、死信交换机和死信队列
* 3、正常队列和死信交换机(核心配置)
*/
@Configuration
public class RabbitMQConfigDLXTTL {
// 正常队列和交换机
public static final String QUEUE_NAME = "order_queue";
public static final String EXCHANGE_NAME="order_exchange";
// 配置死信队列和死信交换机
public static final String QUEUE_NAME_DLX = "order_queue_dlx";
public static final String EXCHANGE_NAME_DLX="order_exchange_dlx";
// 1、交换机的配置
@Bean("bootExchange")
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 死信交换机
@Bean("ExchangeDLX")
public Exchange exchangeDlx(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
}
// 2、正常队列和死信交换机
@Bean("bootQueue")
public Queue bootQueue(){
Map<String, Object> args = new HashMap<>(4);
// 1、死信交换机名称
args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
// 2、死信交换机的key
args.put("x-dead-letter-routing-key", "dlx.order.cancel");
// 3、设置队列的过期时间
args.put("x-message-ttl", 10000);
// 4、设置队列的长度限制
args.put("x-max-length",1000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
// 死信队列配置
@Bean("bootQueueDLX")
public Queue bootQueueDLX(){
return QueueBuilder.durable(QUEUE_NAME_DLX).build();
}
/**
* 正常交换机绑定正常队列
* 1、知道哪个队列
* 2、知道哪个交换机
* 3、设置对应的 routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
// 死信交换机,和死信队列
@Bean
public Binding bindQueueExchangeDlx(@Qualifier("bootQueueDLX") Queue queue,@Qualifier("ExchangeDLX") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
}
}
7.3、消费者
/**
* 延迟队列
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
/**
* ackMode = "MANUAL"要进行这个的设置,不然就还是自动的,那么就会有冲突,会出现错误
* @param message
* @param channel
* @throws Exception
*/
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "order_queue_dlx"
,ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1、接收消息
System.out.println("获取到: "+new String(message.getBody()));
// 2、处理业务逻辑
System.out.println("数据库处理");
// 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 拒接签收 第三个参数:requeue:重回队列,设置成true,那么消息会回到queue,会重新发送消息给消费端。
System.out.println("出现异常,拒绝接收。。不重回队列");
// 出错的话,就会一直进行执行,重新尝试
channel.basicNack(deliveryTag,true,false);
}
}
}
运行结果(10s后会进行消费处理):
10s后: