目录
一、概述
1、消息可靠性
2、SpringBoot整合RabbitMQ配置文件
二、生产者---RabbitMQ服务器如何保证信息不丢失
1、confirm确认模式
1.说明
2.SpringBoot代码实现
2、return退回模式
1.说明
2.SpringBoot代码实现
三、RabbitMQ服务器如何保证消息不丢失
四、RabbitMQ服务器---消费者如何保证消息不丢失
1、ACK
2、实现方式
3、SpringBoot代码实现
一、概述
1、消息可靠性
当生产者生产出一条消息发送给MQ是时,该消息来到MQ服务器会先到达交换机,然后由交换机根据路由分发给对应的队列,然后再由MQ服务器给消费者进行消费。这个步骤可分为三个过程分别是消息从消费者到交换机、消息从交换机到队列、消息从队列到消费者。那么RabbitMQ是如何保证消息的可靠性的呢。
在前两个过程,也就是消息从生产者到达服务器的过程,RabbitMQ提供了两种保证消息从生产者到MQ服务器消息可靠性的方法,分别是confirm确认模式与return退回模式。而在第三个过程,也就是消息从MQ服务器到消费者的过程,RabbitMQ提供了ACK模式,如果学习过计算机网络TCP协议就回明白,ACK(ackonwledge承认)当消费者收到MQ服务器的消息后,会给MQ服务器返回一个确认信息,服务器收到ack才会对该消息进行删除。
2、SpringBoot整合RabbitMQ配置文件
在了解这几个机制之前,先了解以下SpringBoot整合RabbitMQ的一些配置信息(yml形式)
spring:
rabbitmq:
host: 127.0.0.1 #RabbitMQ服务器IP
port: 5672 #端口
username: guest #用户名
password: guest #密码
virtual-host: /learn #虚拟机
publisher-confirm-type: correlated #开启确认机制
publisher-returns: true #开启回退模式
listener:
simple:
acknowledge-mode: manual #开启收动签收
prefetch: 4 #消费端每次拉去10条数据,直到确认消费完毕才拉去下10条
retry:
enabled: true #开启重试
max-attempts: 4 #重试最大次数
max-interval: 1000s #重试最大时长
二、生产者---RabbitMQ服务器如何保证信息不丢失
1、confirm确认模式
1.说明
当消息从producer生产者到达exchange交换机,会以异步地给消费者返回一个confirmCallbak回调,如果交换机收到了就返回true如果没有收到则返回false,如果返回false生产者收到该信息后可进行重发等处理
2.SpringBoot代码实现
首先我们需要创建一个RabbitMQ地配置类并注入Spring里,让他可以随着项目地启动而启动,然后我们在类里需要先创建队列以及交换机,此出我们使用topic模式进行演示【RabbitMQ】SpringBoot整合RabbitMQ、实现RabbitMQ五大工作模式(万字长文)_1373i的博客-CSDN博客https://blog.csdn.net/qq_61903414/article/details/130174313?spm=1001.2014.3001.5501
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置RabbitMQ
* 配置交换机、队列、绑定队列与交换机
*/
@Configuration
public class RabbitMQConfig {
/**
* topic通配符模式
*/
private static final String T_QUEUE1 = "tQueue1";
private static final String T_QUEUE2 = "tQueue2";
private static final String T_EXCHANGE = "tEx";
@Bean
public Queue tQueue1() {
return QueueBuilder.durable(T_QUEUE1).build();
}
@Bean
public Queue tQueue2() {
return new Queue(T_QUEUE2);
}
@Bean
public TopicExchange tEx() {
return ExchangeBuilder.topicExchange(T_EXCHANGE).durable(true).build();
}
@Bean
public Binding binding1(@Qualifier("tQueue1") Queue tQueue1,@Qualifier("tEx") TopicExchange tEx) {
return BindingBuilder.bind(tQueue1).to(tEx).with("A.*");
}
@Bean
public Binding binding2(@Qualifier("tQueue2") Queue tQueue2,@Qualifier("tEx") TopicExchange tEx) {
return BindingBuilder.bind(tQueue2).to(tEx).with("#.error");
}
}
其次我们要在配置文件里开启RabbitMQ地确认模式
此时我们就可以编写生产者类通过这个方法abbitTemplate.setConfirmCallback实现回调方法
package com.example.demo.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletRequest;
@Controller
@ResponseBody
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/tEx")
public void sendByT() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置信息
* @param b 是否手动消息
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm被执行");
if (!b) {
System.out.println("接收失败" + s);
// 处理,让消息重发
}
}
});
// 故意修改错交换机名称,运行查看回调方法的执行
rabbitTemplate.convertAndSend("tExx","A.error","hello mq");
}
}
在发送消息时故意写错交换机名称,运行代码访问127.0.0.1:8080/tEx接口查看回调方法的执行
此时生产者代码打印了错误信息,此时我们可以通过代码对该消息进行重发处理
2、return退回模式
1.说明
当消息到达交换机以后就会根据路由key去到达匹配的队列里,如果消息在该过程没有到达queue。就会异步地返回一个returnCallback的回调,将错误信息告诉生产者,生产者可进行后续处理,当交换机消息由于路由问题没有到达队列时,此时交换机对消息的处理有两种方式,默认方式是直接丢弃,另一种是将消息返回给生产者,在后续代码实现时,我们要设置第二种方式
2.SpringBoot代码实现
首先要在配置文件里开启回退模式
然后在生产者类代码里这次我们要通过rabbitTemplate.setReturnCallback实现回调方法
package com.example.demo.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletRequest;
@Controller
@ResponseBody
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/tEx")
public void sendByT() {
rabbitTemplate.setMandatory(true); // 设为true消息由交换机给queue失败时返回给发送者
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback(){
/**
*
* @param message 回退的信息
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("执行了回退方法");
}
});
// 故意写错路由让消息到交换机后无法到达队列
rabbitTemplate.convertAndSend("tEx","lolo","hello mq");
}
}
此时运行代码,访问接口查看回调方法的执行
三、RabbitMQ服务器如何保证消息不丢失
在MQ服务器里交换机、队列等都是默认持久化到硬盘里的,消息到达MQ服务器是默认存储在硬盘里的。所以数据不会因为MQ服务器宕机重启而导致丢失。
四、RabbitMQ服务器---消费者如何保证消息不丢失
1、ACK
消费者收到消息后会给MQ服务器返回收到消息的确认信息,而确认在MQ里确认信息有三种,粉分别是:自动确认(默认)手动确认(manual)根据异常进行确认(auto)第三种比较麻烦。其中自动确认是指当消息一旦被消费者接收,就自动确认收到,MQ服务器就会对该消息进行删除。但是在业务代码里,消息收到后,业务可能会出现异常导致消息没有被真正的消费,那么消息就丢失了,此时就有了手动确认的方法,手动设置需要在业务执行完成后调用channel.basicAck()方法手动签收,而如果执行过程中代码出现了异常也可以使用channel.basicNack()方法进行拒收,让MQ服务器自动重发消息
2、实现方式
在上述介绍了三种实现方式,默认是自动确认但存在缺陷,下面我们用代码实现手动确认
3、SpringBoot代码实现
首先要做配置文件里添加开启自动确认的属性
然后创建一个消费者类并注入Spring中
此时编写消费者代码,手动确认与拒绝确认API在下面代码注释中有讲解
package com.example.demo.controller;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DemoQueueListener {
/**
* 手动签收
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "fQueue1")
public void listener(Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("消费消息" + new String(message.getBody())); // 处理消息
/**
* 参数1 :收到消息的标签
* 参数2 :false--签收所有的消息
*/
int i = 10 / 0;
// 故意不手动接收 抛出异常 看MQ是否重发
// channel.basicAck(tag, true); // 确认签收
} catch (Exception e) {
/**
* 参数3:true--消息重回队列,会重发该消息 false---不回
*/
channel.basicNack(tag,true,true); // 拒绝签收
}
}
}
此时运行代码查看消息是否被消费,该队列只有一条消息,看是否会被消费
此时消息不但没有被消费,还被持续重发进行重复消费