实现了RabbitMQ各个模式(simple、topic、direct、fanout及发送方确认和接收方确认)的一个demo
源码:https://gitee.com/xunan29/study-rabbitmq-test-project
参考文章:
https://blog.csdn.net/K_kzj_K/article/details/106642250
https://blog.csdn.net/qq_41466440/article/details/118567253
项目介绍
这个项目主要是实现了rabbitMQ各个模式(simple、topic、direct、fanout及发送方确认和接收方确认)的一个demo
rabbitmq-publisher-model
rabbitmq-receiver-model
上面俩是一起的,一个生产者,一个消费者
mq-message-callback-publisher-model9092
mq-message-callback-consumer-model9093
上面俩是一起的,一个生产者,一个消费者,其主要是根据上面那两个模块实现了发送方确认和接收方确认模式功能
依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.3</version>
</dependency>
配置文件
server:
port: 9090
#优雅关停
shutdown: graceful
spring:
lifecycle:
#强制关停时间(配合优雅关停)
timeout-per-shutdown-phase: 60s
application:
name: rabbitmq-publisher-model
#配置rabbitMq 服务器
rabbitmq:
# host: 172.2200.10.2
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虚拟host 可以不设置,使用server默认host
virtual-host: /
其中需要注意rabbitmq代码访问端口是5672,而不是15672,15672是用来访问可视化管理界面的
config
config是按各个模式需求,配置了相应的队列、交换机和binding,为什么在消费者中也配置了:是因为如果没配置在最开始时没有先启动生产者,消费者这边会报错,消费者这边配置了就避免了这个问题。
controller
生产者的controller用来生产消息
service
消费者的service用来消费信息
其中work模式我没有实现,如果要写的话它就是在simple模式上,多加个消费者就可,它只会有一个消费者消费消息,参考:https://blog.csdn.net/qq_39240694/article/details/106911755
还有需要注意,如果消息传递java对象则必须java对象是在同一个模块或者提取到一个公用模块中(即消费者和生产者在一个模块才能使用),我项目中这种虽然在两个模块但同包同类名同路径也是不可以使用的
发送方确认和接收方确认
配置文件中加一些新配置
#配置rabbitMq 服务器
rabbitmq:
# host: 172.2200.10.2
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虚拟host 可以不设置,使用server默认host
virtual-host: /
#确认消息已发送到交换机(Exchange)
#springboot.rabbitmq.publisher-confirm 新版本(2.2.0之后)已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
#publisher-confirms: true
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
配置相关的消息确认回调函数,RabbitConfig.java
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?先从总体的情况分析,推送消息存在四种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到sever,交换机和队列啥都没找到
④消息推送成功①消息推送到server,但是在server里找不到交换机
也就是这里有交换机但是没有创建也没有配置
结论: ①这种情况触发的是 ConfirmCallback 回调函数。
②消息推送到server,找到交换机了,但是没找到队列
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
③消息推送到sever,交换机和队列啥都没找到
结论: ③这种情况触发的是 ConfirmCallback 回调函数。
④消息推送成功
结论: ④这种情况触发的是 ConfirmCallback 回调函数。
消费者接收到消息的消息确认机制
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
①自动确认, 这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
② 根据情况确认, 这个不做介绍
③ 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理:
着重讲下reject,因为有时候一些场景是需要重新入列的。
channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
顺便也简单讲讲 nack,这个也是相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
项目中MessageListener 的config类中进行了消息接收 手动确认 配置
==MyAckReceiver ==(在service里)是对应的手动确认消息监听类
上述的类每个都有两个,一个对应处理一个队列,另一个对应处理多个队列
里面配置在注释中有详细介绍,