项目代码:RabbitMQDemo: 学习RabbitMQ的一些整理
基本概念
- RabbitMQ是一种基于AMQP协议的消息队列实现框架
- RabbitMQ可以用于在系统与系统之间或者微服务节点之间,进行消息缓存,消息广播,消息分配以及限流消峰处理
- RabbitMQ-Server本身是一个可独立运行的第三方服务,开发者并不需要单独维护,相当于一个虚拟主机存储了RabbitMQ使用过程中所需要的各种变量和对象,同时也提供了UI级别的操作接口
关于消息队列的特点
形象一点:理解成快递驿站,快递员投放,客人取货,快递站暂存
- 生产者发起消费者订阅:生产者主动将消息发送到消息队列,消费者再从队列中获取消息。与HTTP请求消费者向生产者发起请求的数据流向相反。
- 异步而非同步:生产者推送消息到队列后生产者推逻辑已经结束,不再线程拥塞等待消费者接收消息后传回处理结果
- 推送而非调用:生产者只负责将信息推出去,消息怎么处理,有怎样的回应,不是他考虑的事,也不是RabbitMQ中间件需要考虑的事情,类似于广播机制
RabbitMQ核心概念
- 生产者:产生数据发送消息的程序是生产者。
- 交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个是由交换机类型决定的。
- 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
- 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
安装与控制台
- 安装Erlang:http://www.erlang.org/download,目前版本最好要求26 +
- 安装RabbitMQ服务端:http://www.rabbitmq.com/download.html ,需要梯子
- 启动可视化界面进行管理:
- 进入安装目录,rabbitmq_server-3.4.1\sbin,执行rabbitmq-plugins enable rabbitmq_management
- 重启WIndows下的RabbitMQ服务
- 登录网址http://127.0.0.1:15672/,用户名密码:guest/guest
开发准备
不一定非要按此结构构建项目这么写,只是为了方便测试和练习
父项目POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<packaging>pom</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
<groupId>com</groupId>
<artifactId>RabbitMQDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQDemo</name>
<description>RabbitMQDemo</description>
<properties>
<java.version>8</java.version>
<spring-boot.version>2.7.18</spring-boot.version>
<spring-cloud-alibaba.version>2021.0.6.0</spring-cloud-alibaba.version>
<spring.cloud.version>2021.0.6</spring.cloud.version>
<hutol.version>5.5.7</hutol.version>
<dubbo.version>3.2.14</dubbo.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<!-- 统一管理,配置在此模块下的,子模块要引入依赖必须声明groupId和artifactId,不需要声明版本-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
公共模块
定义一些消费者和生产者公用的常量或者数据类型
消费者和生产者POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>实际模块名</artifactId>
<modelVersion>4.0.0</modelVersion>
<version>1.0</version>
<parent>
<groupId>com</groupId>
<artifactId>RabbitMQDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../RabbitMQDemo</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<!--公共基础包 -->
<dependency>
<groupId>com</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
生产者测试接口
@RestController
@RequestMapping("/rabbitMQDemo/具体调用地址")
public class DirectProviderController {
@Resource
DirectProviderService directProviderService;
@GetMapping("/send")
public String send() {
return directProviderService.sendMsg(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
}
}
其他代码
依据不同交换机类型案例专项给出
交换机类型
直连模式-单一消费者
即一个生产者–一个队列–一个消费者
主要步骤
- 生产者创建队列,交换机对象,实现交换机与队列的绑定,并将这些信息登记到RabbitMQ服务中
- 生产者指明目标交换机和交换机-队列绑定关系后,向RabbitMQ服务推送消息
- 消费者订阅队列,并从队列中获取消息
可以看到,一般情况下消费者不用和交换机打交道,只需要知道自己要订阅哪个队列即可
生产者实现
-
配置文件
server: port: 8881 spring: application: name: RabbitMQ-Direct-Provider rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest
-
创建队列等内容
@Configuration //这里只要创建了SpringBean对象即可,具体调用时后台会自动将这些对象注册到RabbitMQ-Sevrver中 public class DirectConfiguration { @Bean public Queue rabbitmqDemoDirectQueue() { /** * 1、name: 队列名称 * 2、durable: 是否持久化 * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。 * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。 * */ return new Queue(DirectRabbitMQTag.Direct_TOPIC, true, false, false); } @Bean public DirectExchange rabbitmqDemoDirectExchange() { //Direct交换机 return new DirectExchange(DirectRabbitMQTag.Direct_EXCHANGE, true, false); } @Bean public Binding bindDirect() { //链式写法,绑定交换机和队列,并设置匹配键 return BindingBuilder //绑定队列 .bind(rabbitmqDemoDirectQueue()) //到交换机 .to(rabbitmqDemoDirectExchange()) //并设置匹配键 .with(DirectRabbitMQTag.Direct_ROUTING_KEY); } }
-
配置消息转换器(该步骤目的,请参考常见问题:消息序列化一节内容)
@Configuration public class RestTemplateConfiguration { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数 rabbitTemplate.setMandatory(true); //这里配置在RabbitMQ传递消息时,将请求实体对象先转为JsonObject在进行投递 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } }
-
发送消息逻辑
@Service @Slf4j public class DirectProviderService { @Resource private RabbitTemplate rabbitTemplate; public String sendMsg(String msg) { String result = "OK"; try { //采用自定义消息体 DirectDto directDto = new DirectDto(); directDto.setMsg(msg); directDto.setMsgId(UUID.randomUUID().toString()); directDto.setMsgType("test"); //利用RabbitTempate对象进行发送,比使用原生RabbitMQ对象更加简单 //注意时异步的,消息发送到交换机后函数就执行完成了,至于消息有没有被谁解析,无从得知 rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto); } catch (Exception e) { log.error(e.getMessage(), e); result = "FAIL"; } return result; } }
-
检查步骤:登录RabbitMQ-Server查看队列和交换机是否创建
消费者实现
-
配置文件
server: port: 8882 spring: application: name: RabbitMQ-Direct-Client rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest
-
创建消息转换器(该步骤目的,请参考常见问题:消息序列化一节内容)
@Configuration public class RabbitMQCustomerConfiguration { @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
-
创建订阅并解析
@Component @Slf4j public class DirectReceiver { //对于消费者而言并不知道交换机编号或者交换机类型是什么,只关心队列名称,以便实现订阅 @RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC}) public void process(Message message) { //这里采用默认Message对象接收RabbitMQ的消息, //因为消费者和生产者都做了Json正反序列化,因此从message.getBody()获取的二进制流可以直接转换为消息体对应的JsonObjectStr log.info("Received message: {}", new String(message.getBody())); //实际返回结果 Received message: {"msgId":"a6c22848-76db-4f75-b24c-f5c49a335d77","msg":"2025-03-20 10:58:40","msgType":"test"} } }
直连模式-多个消费者
- 即一个生产者–一个队列–一个消费者
- 对于一个队列绑定了多个消费者的情况,RabbitMQ会议轮询的方式,将生产者的消息依次按顺寻分别分发到所有的消费者中,其效果非常类似等权值负载均衡
生产者代码
-
新增一个服务接口,以便测试功能
@Service @Slf4j public class DirectProviderService { @Resource private RabbitTemplate rabbitTemplate; public String sendMsg(String msg) { String result = "OK"; try { DirectDto directDto = new DirectDto(); directDto.setMsg(msg); directDto.setMsgId(UUID.randomUUID().toString()); directDto.setMsgType("test"); rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto); } catch (Exception e) { log.error(e.getMessage(), e); result = "FAIL"; } return result; } //新增功能,重复投递十个消息到队列,Controller层代码省略,正常调用即可 public String sendMsg2() { for (int index = 0; index < 6; index++) { this.sendMsg("index is " + index); } return "OK"; } }
消费者代码
- 复制一份上一节消费者代码,并让两者分别在8882,8883端口同时启动
- 注意一定要保证都订阅DirectRabbitMQTag.Direct_TOPIC队列
执行结果
扇出模式
- 一个生产者–多个队列-每个队列一个消费者
- 这种情况下,每个队列的消费者都会收到相同的消息,类似于广播机制
- 这种模式,交换机和队列没有Key绑定
生产者实现
-
配置文件
server: port: 8883 spring: application: name: RabbitMQ-Fanout-Provider rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest
-
创建队列等内容
package com.provider.configuration; import com.common.config.FanoutRabbitMQTag; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfiguration { @Bean public Queue fanoutExchangeQueue01() { //队列A return new Queue(FanoutRabbitMQTag.FANOUT_QUEUE_01, true, false, false); } @Bean public Queue fanoutExchangeQueue02() { //队列B return new Queue(FanoutRabbitMQTag.FANOUT_QUEUE_02, true, false, false); } @Bean public FanoutExchange rabbitmqDemoFanoutExchange() { //创建FanoutExchange类型交换机 return new FanoutExchange(FanoutRabbitMQTag.FANOUT_EXCHANGE, true, false); } @Bean public Binding bindFanoutA() { //队列A绑定到FanoutExchange交换机,这里不需要提供队列和交换机KEY值 return BindingBuilder.bind(this.fanoutExchangeQueue01()).to(this.rabbitmqDemoFanoutExchange()); } @Bean public Binding bindFanoutB() { //队列B绑定到FanoutExchange交换机,这里不需要提供队列和交换机KEY值 return BindingBuilder.bind(this.fanoutExchangeQueue02()).to(this.rabbitmqDemoFanoutExchange()); } }
-
配置消息转换器
与直连模式相同,不与累述
-
发送消息逻辑(controller层省略)
@Service @Slf4j public class FanoutProviderService { @Resource private RabbitTemplate rabbitTemplate; public String sendMsg(String msg) { String result = "OK"; try { String msgId = UUID.randomUUID() + "\t" + msg; //没有KEY值,第二个参数传一个“”即可 rabbitTemplate.convertAndSend(FanoutRabbitMQTag.FANOUT_EXCHANGE, "", msgId); } catch (Exception e) { log.error(e.getMessage(), e); result = "FAIL"; } return result; } }
消费者实现
-
配置文件
server: port: 8884 spring: application: name: RabbitMQ-Fanout-Client rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest
-
创建消息转换器
与直连模式相同,不与累述
-
创建订阅并解析
@Component @Slf4j public class FanoutReceiver { //注意:这里只订阅了队列1 @RabbitListener(queues = {FanoutRabbitMQTag.FANOUT_QUEUE_01}) public void process(String msg) { log.info("FanoutReceiver process message: {}", msg); } }
-
复制上述工程让其可以同时在8885启动,并订阅队列2
@Component @Slf4j public class FanoutReceiver { //注意:这里只订阅了队列2 @RabbitListener(queues = {FanoutRabbitMQTag.FANOUT_QUEUE_02}) public void process(String msg) { log.info("FanoutReceiver process message: {}", msg); } }
执行结果
-- 8884,8885两个端口的服务均会收到相同的消息
FanoutReceiver process message: 3fdd6882-97f3-489d-9c45-79bb3c598821 2025-03-24 10:42:47
常见问题:消息序列化
问题描述
- RabbitMQ在推送消息到队列时,支持传递自定义bean对象,但默认会直接转为二进制流字符数组(btye[])
- 消费者在接收消息时,很有可能无法正确的将byte[]反序列化为bean对象,进而引发
Failed to convert message
异常 - 即便将自定义Bean对象类型抽出到公用模块,在有些版本中依然不能解决问题
解决策略
- 强制修改RabbitMQ在推送消息过程中的转换器不转化为二进制对象,具体步骤包括
- 配置生产者转化器对象,让RabbitMQ将Bean转化为Json在转化为二进制byte[]传递
- 配置消费者解析器对象,让RabbintMQ将byte[]转化为JsonStr,再自行按需转换为Bean类型
实现步骤
生产者配置
@Configuration
public class RestTemplateConfiguration {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
{
//设置开启Mandatory,才能触发回调函数,⽆论消息推送结果怎么样都强制调⽤回调函数
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
生产者推送
@Service
@Slf4j
public class DirectProviderService {
@Resource
private RabbitTemplate rabbitTemplate;
public String sendMsg(String msg) {
String result = "OK";
try {
DirectDto directDto = new DirectDto();
directDto.setMsg(msg);
directDto.setMsgId(UUID.randomUUID().toString());
directDto.setMsgType("test");
rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);
} catch (Exception e) {
log.error(e.getMessage(), e);
result = "FAIL";
}
return result;
}
}
消费者配置
@Configuration
public class RabbitMQCustomerConfiguration {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
消费者解析
@Component
@Slf4j
public class DirectReceiver {
@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
public void process(Message message) {
log.info("Received message: {}", new String(message.getBody()));
}
}
//输出结果
//Received message: {"msgId":"a6c22848-76db-4f75-b24c-f5c49a335d77","msg":"2025-03-20 10:58:40","msgType":"test"}
其他说明
- 对于String,int,double这样的原子类型为消息体时
- 消费者和生产者不配置Json转换器,可以正常传输与解析
- 消费者和生产者配置了Json转换器,可以正常传输与解析,不会与Bean传输冲突
高级应用:手动确认
问题描述
- 默认情况下RabbitMQ采用自动签收模式,这种情况下【生产者】并不知道以下内容
- 消息是否正确的从生产者发送到了交换机:可能目标交换机被析构了,或者目标交换机明写错了
- 消息是否正确的从交换机投递到了指定队列:可能时交换机找不到队列了
- 因此生产者为了知道投递消息到交换机以及交换机推送到队列的情况,可以使用手工确认模式
其他说明
- 手工确认模式分为生产者修改和消费者修改,一般而言
- 生产者开启了手工确认,则消费者也要使用手工确认
- 生产者没有开启手工确认,消费者也可以使用手工确认,比如消费重试,死信队列等场景,但这种模式下,生产这无法知道投递状态
- 手工确认模式不会让生产者处理消费者在从队列拿到消息后处理失败的问题,即生产者最远只关心到消息有没有到队列,一旦到了队列后续再出错就不是生产者的职责范围了。
生产者的修改内容
- 开启手工确认模式,即允许监听投递交换机失败和推送队列失败的回调
- 实现ConfirmCallback回调,处理投递到交换机失败的异常
- 实现ReturnsCallback回调,处理推送到队列失败的异常
修改Yml文件
server:
port: 8881
spring:
application:
name: RabbitMQ-Direct-Provider
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 开启手工确认模式
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true # 消息投递失败返回客户端
实现回调
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//必须开启此参数,否则Returns回调不会被触发
rabbitTemplate.setMandatory(true);
//设置回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息投递成功! {}" , correlationData);
} else {
log.error("消息投递失败! {} ", cause);
}
});
rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("ReturnedMessage: {}", returnedMessage));
return rabbitTemplate;
}
消费者的修改内容
-
在手工模式下消费者需要处理以下两个内容,特别注意这两个过程和生产者的confim,returns回调没有关系
永远不要企图让生产者去捕获或者处理消费者解析消息时的异常!!!!!!
- 从队列拿到消息后,若正常处理需要告知RabbitMQ(不是生产者)当前消息需要从队列中出队
- 若处理异常要告知RabbitMQ(不是生产者),当前消息时回到队列等待再次拿取,还是丢弃不要了
-
关于处理异常时采用了回到队列这种模式的后续处理与注意事项,请参看【消费重试】一节内容
解析工程配置
@Configuration
public class RabbitMQCustomerConfiguration {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//对于配置了自定义类型转换的场景,如果想开启手工确认,必须在这里配置,yml中配置不生效!!!
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
一定要注意,如果注册了自定义解析工厂,那么在Yml无论怎么配置listener.simple.acknowledge-mode还是listener.direct.acknowledge-mode都是不起效的!!!!
修改接收逻辑
@Component
@Slf4j
public class DirectReceiver {
//对于手工签收,一定要使用Message对象,因此不建议搭配@RabbitHandler强制解析消息体类型,因此直接对解析方法增加@RabbitListener
//Channel参数为RabbitMQ自动注入的,不需要额外关心
@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
public void process(Message message, Channel channel) throws IOException {
//获取消息传递的标记
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//处理业务逻辑
log.info("开始解析消息...");
log.info(new JSONObject(new String(message.getBody())).toString());
//模拟随机出错的可能
int temp = 1 / RandomUtil.randomInt(0,2);
// 手动签收:第一个参数:表示收到的标签, 第二个参数:表示是否执行批处理确认
channel.basicAck(deliveryTag, true);
log.info("签收成功...");
} catch (Exception e) {
log.error("处理异常{},退回队列 ", e.getMessage());
//如果处理业务逻辑的时候发生了错误,则要拒绝签收
//参数1 multiple 是否批量处理
//参数2 requeue = true 表示将错误的消息,重新放回到队列中并且一般放置在队列头部的位置,等待重提取
//参数2 requeue = false 表示将错误的消息,不放回队列,如果有死信队列则放入到死信队列,没有则丢弃
channel.basicNack(deliveryTag, true,true);
}
}
}
测试结果
-- 生产者日志输出
2025-03-21 10:15:54.952 INFO 21220 --- [nectionFactory1] c.p.c.RestTemplateConfiguration : 消息投递成功! null
-- 消费者输出
2025-03-21 10:15:02.996 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 10:15:02.997 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:02.997 ERROR 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 处理异常,退回队列 / by zero
2025-03-21 10:15:03.011 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 10:15:03.011 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:03.012 ERROR 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 处理异常,退回队列 / by zero
2025-03-21 10:15:03.013 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 10:15:03.013 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:03.014 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 签收成功...
测试结果说明
- 只要生产者投递消息到交换机就会有Confim回调(不论成功与否)
- 交换机若转发到队列无异常,则不会触发Returns回调
- 若客户端手动确认消息时选择了【重回队列】那么该消息又会立刻被消费者重新接收,直到该消息被正确处理并出队
- 显然,步骤3这种逻辑存在缺陷,如果消息体自身就是包含问题数据,则重回队列-重新接收步骤就会无限重复,关于此问题请参考【消费重试】一节内容
高级应用:消费重试
问题描述
- 如果消息体自身可能包含问题数据,或者解析过程中出现IO等运行时异常,导致消费者解析会出错时,一般有两种处理方式
- 接收消息,消息出队,捕获异常,直接处理:直接,简单,但容错率低
- 拒绝消息,消息回队,重新抓取,再次处理:容错率搞,但复杂
- 因为消费者处理过程出现异常存在随机性,因此方式2更为合理,
- 但如果处理过程必然出错,例如解析逻辑bug,脏数据等,则重回队列-重新接收步骤就会无限重复,进而拥塞系统造成运行瘫痪
- 所以,消费重试必须要有次数限制,也必须要有兜底策略
其他说明
- 消费重试是【消费者】对解析过程异常的处理,和【生产者】没有关系
- 不论采用【手工确认】还是【自动确认】都可以接入消费重试,只是接入和实现方式有所不同
- 自动确认实现便利,但不够灵活
- 手工确认实现复杂,但足够灵活
- RabbitMQ集成Springboot中有很多中消费重试的实现框架,比如利用retryTemplate,Channel,可以按照需要自由选择
- 关于兜底策略RabbitMQ推荐将出错的消息推送到【死信队列】进而在RabbitMQ中统一管理,实际开发过程中也可以选择或其他的实现方式
- 关于死信队列相关内容,可以参考【死信队列】一节内容
手工确认模式下的重试实现
- 这只是一种很简单的实现逻辑
- 关于自动模式下的自动处理,可以参考这篇博文,注意处理异常不捕获和yml配置即可:RabbitMQ重试-自动签收模式
- 注意:请参考【高级应用:手动确认-消费者的修改内容-解析工厂配置】一节内容,配置解析工厂,确保手工确认模式起效
@Component
@Slf4j
public class DirectReceiver {
private final int maxRetryTimes = 3;
//对于手工签收,一定要使用Message对象,因此不建议搭配@RabbitHandler强制解析消息体类型,因此直接对解析方法增加@RabbitListener
//Channel参数为RabbitMQ自动注入的,不需要额外关心
@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})
public void process(Message message, Channel channel) throws IOException {
//获取消息传递的标记
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//处理业务逻辑
log.info("开始解析消息...");
log.info(new JSONObject(new String(message.getBody())).toString());
//必然的解析出错
int temp = 1 / 0;
// 手动签收:第一个参数:表示收到的标签, 第二个参数:表示是否执行批处理确认
channel.basicAck(deliveryTag, true);
log.info("签收成功...");
} catch (Exception e) {
log.error("处理异常{},退回队列 ", e.getMessage());
//如果处理业务逻辑的时候发生了错误,则要拒绝签收
//参数1 multiple 是否批量处理
//参数2 requeue = true 表示将错误的消息,重新放回到队列中并且一般放置在队列头部的位置,
//参数2 requeue = false 表示将错误的消息,不放回队列,如果有死信队列则放入到死信队列,没有则丢弃
//channel.basicNack(deliveryTag, true,true);
this.handleRetry(message, channel, e);
}
}
private void handleRetry(Message message, Channel channel, Exception processException) {
try {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
Long retryCount = (Long) headers.getOrDefault("retry-count", 0L);
if (retryCount < maxRetryTimes) {
//手动进行应答, 接收此消息,先让消息出队,false表示只确认当前者一条消息出队
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//增加重试次数标记
headers.put("retry-count", retryCount + 1);
//重新发送消息发送到MQ中原有队列的【队尾】
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
new AMQP.BasicProperties().builder().contentType(message.getMessageProperties().getContentType()).headers(headers).build(),
message.getBody());
} else{
//兜底方案
//参数2 requeue = false 表示将错误的消息,不放回队列,如果有死信队列则放入到死信队列,没有则丢弃
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
//这里也可以存入将错误的请求体或者具体处理异常信息,存储日志,redis或者数据库中,方便人工查阅
log.error("重试次数超限,处理异常原因" , processException);
}
} catch (Exception e) {
log.error("重试处理异常{}, ", e.getMessage());
}
}
}
测试结果
2025-03-21 15:17:36.489 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 15:17:36.513 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.514 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 处理异常/ by zero,退回队列
2025-03-21 15:17:36.519 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 15:17:36.520 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.520 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 处理异常/ by zero,退回队列
2025-03-21 15:17:36.521 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 15:17:36.521 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.521 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 处理异常/ by zero,退回队列
2025-03-21 15:17:36.524 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 开始解析消息...
2025-03-21 15:17:36.524 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.524 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 处理异常/ by zero,退回队列
2025-03-21 15:17:36.528 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 重试次数超限,处理异常原因
java.lang.ArithmeticException: / by zero
at com.customer.receiver.DirectReceiver.process(DirectReceiver.java:35) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_321]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_321]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_321]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_321]
测试结果说明
- 解析逻辑必然出错,触发异常后,消费者重试了三次
- 最终触发兜底方法,拒绝签收消息,让消息丢失
高级应用:死信队列
问题描述
- 在消费者处理信息时,可能因为一些环境原因导致解析过程发生异常,而消费者自身没有特殊的异常处理机制(自动重试,自动丢弃,本地存储等)
- 这时可以将引发异常的消息,从原有队列中迁出,转投递到死信队列,加以缓存,方便运维人员后期查询或者分析错误消息,
补充说明
- 与普通队列的相同点:
- 死信队列也是一种队列,需要搭配交换机才能完成,接收消息
- 死信队列也可以又消费者监听,进而实现消息从死信队列中出队
- 与普通队列的不同点:
- 死信队列不单独存在,一般需要和原有的业务队列进行绑定
- 死信队列的消息,不直接由生产者显式的投递,而是在特定条件下由RabbitMQ-Client自动投递
- 一般死信队列功能需要和【手工确认】搭配使用
生产者实现
配置文件
server:
port: 8881
spring:
application:
name: RabbitMQ-Direct-Dead-Provider
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 不起效 暂时不用配置
# listener:
# type: simple
# simple:
# default-requeue-rejected: false
# acknowledge-mode: manual
创建队列等内容
重点!!!!!
@Configuration
public class DirectConfiguration {
public static final String DIRECT_EXCHANGE = "direct-exchange";
public static final String DIRECT_ROUTING_KEY = "direct-routing-key";
public static final String DIRECT_QUEUE = "direct-queue";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead-routing-key";
public static final String DEAD_EXCHANGE = "dead-exchange";
@Bean
public DirectExchange rabbitmqDeadExchange() {
//定义死信交换机,类型为Direct
return new DirectExchange(DEAD_EXCHANGE, true, false);
}
@Bean
public DirectExchange rabbitmqDirectExchange() {
//定义业务交换机,类型为Direct
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
@Bean Queue rabbitmqDeadQueue() {
//定义死信队列
return new Queue(DEAD_QUEUE);
}
@Bean
public Queue rabbitmqDirectQueue() {
//定义业务队列,并将业务队列与死信队列绑定,以便RabbitMQ自动投递
Map<String, Object> args = new HashMap<>(2);
//x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
//x-message-ttl, 生命TLL,超过5s没有被消费消息,也会自动投递到死信队列
args.put("x-message-ttl",5000);
return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();
}
@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDirectQueue())
//到交换机
.to(rabbitmqDirectExchange())
//并设置匹配键
.with(DIRECT_ROUTING_KEY);
}
@Bean
public Binding bindDirectDead() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDeadQueue())
//到交换机
.to(rabbitmqDeadExchange())
//并设置匹配键
.with(DEAD_ROUTING_KEY);
}
}
显然,死信队列和业务队列一样,都有队列申明,交换机申明,队列交换机绑定三个步骤,不同的是,业务队列申明时需要额外与死信队列进行绑定
配置消息转换器
不予累述
发送消息逻辑
@Service
@Slf4j
public class DirectProviderService {
@Resource
private RabbitTemplate rabbitTemplate;
public String sendMsg(String msg) {
String result = "OK";
try {
DirectDto directDto = new DirectDto();
directDto.setMsg(msg);
directDto.setMsgId(UUID.randomUUID().toString());
directDto.setMsgType(""+RandomUtil.randomInt(0,3));
rabbitTemplate.convertAndSend(DirectConfiguration.DIRECT_EXCHANGE, DirectConfiguration.DIRECT_ROUTING_KEY, directDto);
} catch (Exception e) {
log.error(e.getMessage(), e);
result = "FAIL";
}
return result;
}
}
消费者实现
配置文件
server:
port: 8882
spring:
application:
name: RabbitMQ-Direct-Dead-Client
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
消费者配置
注意:请参考【高级应用:手动确认-消费者的修改内容-解析工厂配置】一节内容,配置解析工厂,确保手工确认模式起效
消费者解析-正常接收消息
@Component
@Slf4j
public class DirectReceiver {
//手工模式确认
@RabbitListener(queues = "direct-queue")
public void receiveA(Message message, Channel channel) throws IOException {
DirectDto directDto = new JSONObject(new String(message.getBody())).toBean(DirectDto.class);
log.info("收到业务消息A, 准备处理:{}", directDto.toString());
try{
Thread.sleep(Integer.parseInt(directDto.getMsgType()));
} catch (Exception e) {
log.error(e.getMessage());
}
//这里模拟一下发生异常时,将消息投递到死信队列
if (RandomUtil.randomBoolean()){
log.error("消息消费发生异常");
//参数2 requeue = false 表示将错误的消息,不放回队列,因为有死信队列,所以放入到死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
log.error("消息消费完成");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
消费者解析-消费死信队列
@Component
@Slf4j
public class DeadReceiver {
@RabbitListener(queues = "dead-queue")
public void receiveA(Message message, Channel channel) throws IOException {
log.info("收到死信消息:{}", new String(message.getBody()));
//这里实际把消息从私信队列中迁出了,所以运行后RabbitMQ-Server在该私信队列中并没有看到记录
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
运行结果
2025-03-24 14:56:22.939 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到业务消息A, 准备处理:DirectDto(msgId=67742847-101d-4074-a664-1ad0aa41ea41, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.941 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消费完成
2025-03-24 14:56:22.942 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到业务消息A, 准备处理:DirectDto(msgId=4e8236fb-1ae7-4427-b855-474603037c78, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.942 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消费发生异常
2025-03-24 14:56:22.943 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到业务消息A, 准备处理:DirectDto(msgId=428faf5f-107d-4295-abf7-33c6ac5da0bc, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.943 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消费完成
2025-03-24 14:56:22.944 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到业务消息A, 准备处理:DirectDto(msgId=d9c4121a-6e74-4944-80b3-3289a9c2d35d, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.944 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消费完成
代码说明
- 因为在正常解析步骤中,采用了channel.basicNack方法并指定了requeue=false,RabbitMQ就会自动去找direct-queue队列是否有绑定死信队列
- 因为direct-queue绑定了dead-queue,所以出错的消息会被自动投递到dead-queue中,并自动标记为Ready状态,等待消息投递与解析
- 而对于死信队列解析,示例中为了方便演示采用了channel.basicAck方法,让当前消息从死信队列中出对了,如果不调用此方法,死信队列就会将此消息记为Unacked状态一直保留在死信队列中,具体采取哪种处理方式,应由实际工作需要决定
- 至于死信队列中Unacked的状态的消息,要如何在运维环境下处理或者查阅,另起文注