前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费;
1 为什么要使用Rabbitmq:
RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点:
-
异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和吞吐量。
-
解耦合:RabbitMQ 的消息队列机制可以将发送者和接收者解耦合,减少了应用程序之间的耦合度。
-
可靠性高:RabbitMQ 支持事务和持久化,能够确保消息不会丢失。
-
高吞吐量:RabbitMQ 支持多种吞吐量调优方法,能够处理高并发的消息通讯。
-
可扩展性:RabbitMQ 支持集群和分布式部署,可以扩展到大规模的消息通讯场景。
RabbitMQ 提供了易用、高效、灵活、可靠的消息传递机制,可以帮助开发者更快地构建系统并实现各种复杂的业务场景。
2 springboot 整合:
2.1 pom 引入依赖:
<!-- rabbitmq 自动装配 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 提供web访问 默认端口8080 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- loomback 用于生成get set 方法 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 阿里的json 数据转换 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.31</version>
</dependency>
2.2 连接参数配置:
2.2.1 基础配置:
基础配置后springboot 的自动装载机制会注册一个RabbitTemplate rabbitTemplate 对象用于消息的接收和发送;
############# 基础配置
# mq 服务器的地址
spring.rabbitmq.host=localhost
# mq 服务器的端口
spring.rabbitmq.port=5672
# mq 服务器的连接使用的用户名
spring.rabbitmq.username=admin
# mq 服务器的连接使用的密码
spring.rabbitmq.password=rabbitmq
# mq 服务器的连接使用的虚拟机
spring.rabbitmq.virtual-host=my_vhost
注意: 其中 spring.rabbitmq.virtual-host 为隔离的虚拟机,需要根据自己业务进行配置,如果rabbitmq 有web 端可以在web端创建需要的v_host:
2.2.2 可扩展的连接参数配置:
############# 连接和管道配置
# When the cache mode is 'CHANNEL', the connection cache size cannot be configured.
# spring.rabbitmq.cache.connection.mode 为connection 生效 ,connection 连接池的大小
#spring.rabbitmq.cache.connection.size=3
# 与broker 连接的 模式 channel 或者 connection 默认channel
spring.rabbitmq.cache.connection.mode=channel
# 与broker 连接的默认时间,默认为 60000即 60 秒,超时会会中断并抛出异常,单位毫秒
spring.rabbitmq.connection-timeout=1000
# 每个连接中可以建立的channel 数量,默认值25
spring.rabbitmq.cache.channel.size=50
# 如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel
# 默认值为 -1,表示不限制等待时间,即一直等待直到获取到可用的 Channel,单位毫秒
spring.rabbitmq.cache.channel.checkout-timeout=2000
# 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.requested-heartbeat=60
# 客户端总共可以创建总的channel 数量
spring.rabbitmq.requested-channel-max=1024
默认与rabbitmq 的连接为channel,多个channel 公用一个connection , 每个线程都从缓存池中获取channel ,每个线程中持有的channel 是互相隔离的;
2.3 生产者发送消息:
生产者发送消息主要是通过 引入 RabbitTemplate 模版对象来完成;这里按照发送消息发送的场景分别进行介绍:
2.3.1 交换机和队列的绑定:
因为消息最开始是要发送到交换机上的,然后在通过交换机通过routkey 路由键到匹配的队列中;所以我们需要先在项目中使用的
virtual-host 中去分别创建交换机和队列,然后进行绑定;一帮情况下,我们应该向运维去申请自己的虚拟机,交换机,队列,然后通过后,项目中直接使用即可;当然通过代码也完全可以进行交换机和队列的创建和绑定,这里我们通过web 页面来进行处理:
2.3.1.1交换机的创建:
-
Virtual host : 对应隔离的虚拟机,所以需要选择项目中 通过spring.rabbitmq.virtual-host 参数连接的虚拟机;
-
Name: 虚拟机的名称,见名知意即可;
-
Type: 虚拟机的类型:比较常用的有直连 direct; 主题topic,广播fanout;
这里对交换机的类型进行简单的介绍: -
直连direct的交换机,交换机直接与队列完成绑定,通过发送消息是携带的Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中:
-
Direct Exchange
Direct Exchange 是最简单的交换机类型,交换机直接与队列完成绑定,它根据消息携带的 Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中。 Direct Exchange 可以理解为一张路由表,交换机通过 Routing Key 在路由表中查找匹配队列,将消息从生产者处发送到匹配队列。 -
Topic Exchange
Topic Exchange 根据 Routing Key 的匹配规则将消息路由到对应的队列中。Topic Exchange 支持两种匹配规则:* 代表通配符,表示可以匹配一个单词,# 代表通配符,表示可以匹配多个单词。例如,Routing Key 为 com.XXX.# 的消息会被路由到匹配 com.XXX. 开头的所有队列中,Routing Key 为 # ,会匹配到所有的消息;列如 user.* 匹配 user. 后跟一个单词的消息,可以匹配到user.a 但是匹配不到user.a.b 。 -
Fanout Exchange
Fanout Exchange 会将消息路由到所有绑定到它上面的队列中。Fanout Exchange 的路由方式与路由表无关,会忽略 Routing Key,与 Direct Exchange 和 Topic Exchange 相比,它具有更高的传输效率和更低的消耗。 -
Headers Exchange
Headers Exchange 根据消息头中的键值对匹配规则将消息路由到对应的队列中。Headers Exchange 的匹配规则相对较复杂,需要在绑定时指定键值对的匹配方式。 -
Durability : 交换机是否持久化到磁盘的属性值设置
-
如果将 Durability 属性设置为 durable ,表示交换器会被持久化到磁盘上,即使 RabbitMQ 服务器在交换机定义被创建之后终止,交换机定义仍然能够在服务器重新启动时得到恢复,从而保证交换机在重启后仍然存在。
-
如果将 Durability 属性设置为 transient ,表示交换器不会被持久化到磁盘上,如果 RabbitMQ 服务器重启,则该交换器定义将会丢失。
-
Auto delete 用于指定该交换机是否自动删除。当一个交换机关联的所有队列都被删除时,如果交换机的 Auto Delete 属性为 true,则该交换机也会被自动删除
-
Internal 是否为内部交换机:
内部交换机的 internal 属性设置为 true,使其只能被通过 AMQP 协议连接到相同 Virtual Host 的客户端使用,不能被直连类型的 Exchange 或 Headers 类型的 Exchange 所使用。
内部交换机只能用于消费者和生产者在同一个 RabbitMQ 实例中的场景,而不能用于服务器和客户端之间传递消息。
内部交换机主要用于应用程序之间传递消息,而不是用于服务器和客户端之间传递消息。 -
Arguments:交换机的额外属性,比较常用的属性如alternate-exchange:指定备用交换机。如果一条消息无法被路由到任何队列中,那么它将被发送到备用交换机中;
一般我们创建交换机时只需要选择Virtual host:,填入交换机的名称,选择交互机的类型这3项,其它都默认即可:
2.3.1.2 队列的创建:
- type 队列的类型:
在 RabbitMQ 中,队列的 type 参数共有三种,分别是 classic、quorum 和 stream。它们的区别可以简单概括如下:
classic 队列:
最早的、经典的队列类型,支持多个消费者竞争消费消息,但是在节点宕机时可能会出现消息丢失的情况。适用于简单的消息处理场景。
quorum 队列:
支持高可用性、多个消费者竞争消费的队列类型。它通过复制机制保证消息的可靠性,可以在节点宕机时自动进行故障转移,避免消息丢失。适用于需要高可用特性的分布式环境中使用,但相对来说,quorum 队列性能较 classic 队列有所下降。
stream 队列:
支持无限缓存的消息流队列,可以通过队列中的缓存来处理各种等待中的问题。传统队列中当消息进入队列时,它就被立即写入了内存中,并等待处理。这样做的问题是,当生产者不断地发送消息时,很容易将内存撑满。 stream 队列则允许队列的缓存区域随着时间和队列大小的增长而扩展,使得待处理的消息可以在缓存区域中有所体现。适用于需要处理海量时间序列数据的场景。
需要注意的是,stream 队列是从 RabbitMQ 3.8 开始引入的新类型,目前还不是很成熟,可能在稳定性和性能方面还需要更多的优化和改进。因此,在选择队列类型时,需要结合具体的业务情况和系统限制,选择采用 classic、quorum 还是 stream 队列,以达到最优的性能和可用性。
- Name 队列的名称;
- Durability 队列是否持久化,参数意义同交换机;
- Auto delete:
在 RabbitMQ 中,队列的 auto-delete 参数用于控制队列的自动删除行为。如果将 auto-delete 参数设置为 true,则在最后一个消费者断开连接时,队列会自动被删除。 - Arguments 队列参数的额外选择;
通常创建队列时只需要选择Virtual host,填入队列的名称,其它项默认即可:
2.3.1.3 交换机和队列的绑定:完成交换机和队列关系的绑定
2.3.2 发送消息:
2.3.2.2 生产者参数的配置:
########## 生产者配置
spring.rabbitmq.template.exchange=my_exchange
# 启用消息投递结果确认
spring.rabbitmq.publisher-returns=true
# 启用强制消息投递,即生产者发送消息成功或者失败,需要返回确认消息
spring.rabbitmq.template.mandatory=true
# 消息发布者确认模式
spring.rabbitmq.publisher-confirm-type=correlated
# 发送重试是否可用
spring.rabbitmq.template.retry.enabled= true
# 最大重试次数,默认值为 3
spring.rabbitmq.template.retry.max-attempts=3
# 第一次和第二次尝试发布或传递消息之间的间隔,默认值为 1000 毫秒
spring.rabbitmq.template.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为 initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.template.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.template.retry.max-interval= 1000
2.3.2.3 使用RabbitTemplate 模版发送单条消息,发送多条消息,发送延迟消息,使用自定义的RabbitTemplate 发送事务消息:
1) 定义一个类来封装我们要发送的消息结构:
package com.example.rabbitmqdemo.rabbitmq.msgDto;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class MsgDto implements Serializable {
// 消息类型
private String msgType;
// 消息体
private Object body;
}
2) 对RabbitTemplate 模版对象配置消息确认:
如果消息投递失败,我们需要对此类消息进行记录,方便后续进行数据补偿;
package com.example.rabbitmqdemo.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component("rabbitMqCustomerConfig")
public class BatchConfig {
@Value("${env:prod}")
private String env;
@Autowired
SimpleRabbitListenerContainerFactory containerFactory;
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void simpleListenerBatchInit() {
log.info("设置批量-----");
containerFactory.setBatchListener(true);
if ("prod".equals(env)) {
// 依照不同的环境进行开启
containerFactory.setAutoStartup(true);
}
// 设置 ConfirmCallback 回调函数 确认消息是否成功发送到 Exchang
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
if (null == correlationData) {
// 延迟消息 correlationData 为null
return;
}
log.debug("Message sent successfully:{} ", correlationData.getId());
} else {
if (null == correlationData && null == cause) {
// 延迟消息 correlationData 为null
return;
}
log.error("Message sent failed: {}", correlationData.getId() + ", cause: " + cause);
}
});
// ReturnCallback 处理的是未路由的消息返回的情况
rabbitTemplate.setReturnCallback((oneMessage, replyCode, replyText, exchange, routingKey) -> {
// 判断是否是延迟消息
if (routingKey.indexOf("delay") != -1) {
// 是一个延迟消息,忽略这个错误提示
return;
}
log.debug("Message returned: {}", new String(oneMessage.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
});
}
}
3) 因为发送事务需要关闭消息的确认,所以这里重新定义一个RabbitTemplate 模版用来发送事务消息:
package com.example.rabbitmqdemo.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TxRabbitTemplate {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean(value = "txRabbitTemplat")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
private ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(10);
// 关闭消息的ack 确认
connectionFactory.setPublisherConfirms(false);
connectionFactory.setPublisherReturns(false);
return connectionFactory;
}
}
4)使用自动装配的RabbitTemplate 模版来进行 消息发送 :
package com.example.rabbitmqdemo.rabbitmq.producer;
import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;
@Slf4j
@Component
public class MessageProducer {
// 这里可以指定一个默认发送使用的交换机
@Value("${amqp-binding.exchange-name:my_exchange}")
private String exchangeName;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("txRabbitTemplat")
private RabbitTemplate txRabbitTemplate;
/**
* 指定的routKey 发送信息
*
* @param message
*/
public void sendMessage(String routKey, Object message) {
this.sendMessage(exchangeName, routKey, JSONObject.toJSONString(message));
}
/**
* 通过交换机,路由key 发送消息
*
* @param exchangeName
* @param routKey
* @param message
*/
public void sendMessage(String exchangeName, String routKey, Object message) {
// 设置消息的唯一标识符
long deliveryTag = System.currentTimeMillis();
rabbitTemplate.convertAndSend(exchangeName, routKey, message, messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + deliveryTag));
return messagePostProcessor;
}, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* 指定的routKey 发送批量信息
*
* @param messages
*/
public void sendMessageBatch(String routKey, MsgDto messages) {
this.sendMessageBatch(exchangeName, routKey, JSONObject.toJSONString(messages));
}
/**
* 通过交换机,路由key 发送批量信息
*
* @param exchangeName
* @param routKey
* @param messages
*/
public void sendMessageBatch(String exchangeName, String routKey, Object messages) {
rabbitTemplate.convertSendAndReceive(exchangeName, routKey, messages, messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + 1));
return messagePostProcessor;
}, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* 指定的routKey 发送信息
*
* @param message
*/
public void sendDelayMessage(String routKey, Object message, long delayTime) {
this.sendDelayMessage(exchangeName, routKey, message, delayTime);
}
/**
* 指定的routKey 发送延迟信息
*
* @param message
*/
public void sendDelayMessage(String exchangeName, String routKey, Object message, long delayTime) {
log.debug("producer send delay message:{}", message);
rabbitTemplate.convertAndSend(exchangeName, routKey, message, header -> {
header.getMessageProperties().setHeader("x-delay", delayTime);
return header;
});
}
/**
* 指定的routKey 发送事务信息
*
* @param message
*/
@SneakyThrows
public void sendTxMessage(String exchangeName, String routKey, Object message) {
log.debug("producer send delay message:{}", message);
String messageStr = JSONObject.toJSONString(message);
// method 1:
// sendTransactedMsgByNewChannel(exchangeName,routKey,message);
// method2:
sendTransactedMsgByNTemplate(exchangeName, routKey, messageStr);
}
private void sendTransactedMsgByNTemplate(String exchangeName, String routKey, String message) {
txRabbitTemplate.execute(channel -> {
try {
String messageId = UUID.randomUUID().toString() + "_messageId";
String correlationId = UUID.randomUUID().toString() + "_correId";
// 创建 BasicProperties 对象并设置属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.correlationId(correlationId)
.build();
channel.txSelect(); // 开启事务
channel.basicPublish(exchangeName, routKey, properties, message.getBytes(Charset.forName("UTF-8"))); // 发送消息
// "124".substring(7);
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback(); // 回滚事务
}
return true;
});
}
@SneakyThrows
private void sendTransactedMsgByNewChannel(String exchangeName, String routKey, String message) {
// 获取新的channel 对象
Channel channel = txRabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
// 开启事务
channel.txSelect();
try {
// 消息格式化
channel.basicPublish(exchangeName, routKey, null, message.getBytes(Charset.forName("UTF-8")));
// 消息提交
channel.txCommit();
} catch (IOException e) {
channel.txRollback();
throw e;
}
}
}
5)测试代码:
package com.example.rabbitmqdemo.rabbitmq.controller;
import com.example.rabbitmqdemo.rabbitmq.enums.RabbitRoutKeyEnum;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.example.rabbitmqdemo.rabbitmq.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
@RestController
public class TestSendMsgController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/sendMsg")
public boolean sendMsg(@RequestParam String content,@RequestParam String routKey) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
msgs.add(content+"_"+i);
}
msgs.stream().forEach(e->{
MsgDto msgDto = new MsgDto("user",e);
messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);
});
return true;
}
@GetMapping("/sendBatchMsg")
public boolean sendBatchMsg(@RequestParam String content,@RequestParam String routKey) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
msgs.add(content+"_"+i);
}
MsgDto msgDto = new MsgDto("test",msgs);
messageProducer.sendMessageBatch(RabbitRoutKeyEnum.业务_多条消息.getRoutKey(), msgDto);
return true;
}
@GetMapping("/sendDelayMsg")
public boolean sendDelayMsg(@RequestParam String content,@RequestParam long delayTime) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
msgs.add(content+"_"+i);
}
msgs.stream().forEach(e->{
messageProducer.sendDelayMessage("my_delay_exchange",RabbitRoutKeyEnum.业务_延迟.getRoutKey(),e,delayTime);
});
return true;
}
@GetMapping("/sendTxMsg")
public boolean sendTxMsg(@RequestParam String content) {
List<Object> msgs = new ArrayList<>(10);
for (int i = 0; i < 2; i++) {
msgs.add(content+"_"+i);
}
msgs.stream().forEach(e->{
MsgDto msgDto = new MsgDto("tx",e);
messageProducer.sendTxMessage("my_tx_exchange",RabbitRoutKeyEnum.业务_事务.getRoutKey(),msgDto);
// messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);
});
return true;
}
}
这里分别测试了单条消息,多条消息,延迟消息,事务消息的发送,将其封装为MsgDto对象,在发送时将其转为json 字符串;基本上满足了大部分的业务场景;需要注意的是rabbitmq 中所谓批量发送的消息实际上会被消息压缩为1条消息进行发送,到达队列是也是1条消息;
6 )routKey 的枚举类:
package com.example.rabbitmqdemo.rabbitmq.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum RabbitRoutKeyEnum {
业务_单条消息("my_routKey"),
业务_多条消息("my_batch_routKey"),
业务_1("my_one_routKey"),
业务_延迟("my_delay_routKey"),
业务_事务("my_tx_routKey"),
;
private String routKey;
}
至此我们已基本完成生产端消息的发送以及发送结果的监听处理;需要注意的是对于延迟消息,返回的确认消息correlationData 是一个null 值,所以这里对其消息的确认进行了一次特殊的判断;
3 消费者接收消息:
3.1 消费者参数的配置:
########## 消费者配置
# 是否自动启动消息的监听
spring.rabbitmq.listener.simple.auto-startup=false
# 消费消息确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 批量预取条数 默认值250
spring.rabbitmq.listener.simple.prefetch=50
# 开启批量消费
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消费的条数
spring.rabbitmq.listener.simple.batch-size=2
# 并发消费最小线程数
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费最大线程数
spring.rabbitmq.listener.simple.max-concurrency=1
### 消费失败 重试参数
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重试次数,默认值为 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重试的时间间隔,默认值为 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为 initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息监听器是否启用无状态(stateless)重试 默认true
spring.rabbitmq.listener.simple.retry.stateless=false
# 控制当消息消费失败后,RabbitMQ 是否需要将消息重新入队。该参数的默认值为 true,即消息将被重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
以上参数,配置了消费端消费消息后的ack 机制为手动提交,并且设定了 批量预取条数 和每次批量消费的条数,以及消费失败的重试机制配置;
3.2 消费消息:
消费者监听某个或者几个队列,然后通过channel 获取要消费的消息:
package com.example.rabbitmqdemo.rabbitmq.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Slf4j
@Component
public class MessageConsumer {
/**
* 逐条/批量 消费
*
* @param messages
*/
// @RabbitListener(queues = "my_queue_one")
public void receiveMessage(List<Message> messages, Channel channel) throws IOException {
log.debug("逐条消费消息:{}", messages);
for (Message message : messages) {
try {
// // 处理消息
log.debug("Received message: {}", message);
String jsonMessage = new String(message.getBody(), "UTF-8");
MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
// 数据处理
// 手动发送 ack 消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
// 发生异常,手动发送 nack 消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 逐条消费--延时消息
*
* @param messages
*/
@RabbitListener(queues = "my_deay_queue")
public void receiveDelayMessage(List<Message> messages, Channel channel) throws IOException {
for (Message message : messages) {
try {
// 处理消息
log.debug("Received delay message: {}", message);
String jsonMessage = new String(message.getBody(), "UTF-8");
MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
// 手动发送 ack 消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
// 发生异常,手动发送 nack 消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 逐条消费--事务消息
*
* @param messages
*/
@RabbitListener(queues = "my_tx_queue")
public void receiveTxMessage(List<Message> messages, Channel channel) throws IOException {
for (Message message : messages) {
try {
// 处理消息
log.debug("Received delay message: {}", message);
String jsonMessage = new String(message.getBody(), "UTF-8");
MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
// 手动发送 ack 消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
// 发生异常,手动发送 nack 消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
这里我们接收到消息后,然后通过"UTF-8"编码(生产者默认按照UTF-8 对数据编码后进行发送)将字节数据转换为字符串,然后通过阿里的json jar 完成java 对象的转换,进行业务处理,最后手动提交消息;
4 总结:
- Rabbitmq 对于消息的发送依赖于交换机,通过routKey 绑定不同的queue 完成消息的路由工作;
- Rabbitmq 发送消息可以为其配置ack确认机制,以及发送失败重试机制参数可以配合完成消息的发送;
- Rabbitmq 发送消息可以进行批量发送,但是本质上会被合并到一条消息进行发送;
- Rabbitmq 对于消息的消费,依赖于构建channel 管道 ,绑定queue 完成消息的消费;
- Rabbitmq 消费消息,可以进行手动的ack 确认,并且可以设置消费重试参数,应便于消费失败的场景;
5 扩展:
5.1 rabbitmq 发送事务消息为什么要关闭 消息的确认回调?
在RabbitMQ中,如果发送事务消息,并且开启了确认模式,那么需要特别注意关闭消息的确认回调,以避免一些潜在的问题。
在RabbitMQ中,开启事务模式后,生产者发送消息时,RabbitMQ会将消息缓存在生产者端。在事务提交之前,不会直接将消息发送到队列。如果在事务未提交的情况下,RabbitMQ服务器异常中断或者连接被关闭,那么消息将会丢失。为了避免这种情况的发生,可以采用事务提交确认和确认模式,在确认之后才将消息发送到队列中。
然而,在发送事务消息时,开启确认模式后,需要关闭消息的确认回调。这是因为在事务提交之前,消息并没有发送到队列中,确认回调将在消息发送到队列后才触发。而在事务模式下,消息已经被缓存到生产者端,没有被发送到队列中,所以确认回调不应该被触发。