RabbitMQ常见场景问题
文章目录
- RabbitMQ常见场景问题
- 6种工作模式
- 1.直连模式
- 2.发布订阅模式
- 3.Routing路由模式
- 4.Topic通配符模式
- 5.Header模式
- 6.RPC
- 消息不丢失
- 消息发送到交换机失败
- 1.配置文件开启发布确认
- 2.配置回调函数
- 3.测试
- 4.如何处理失败消息
- RabbitMQ服务器故障
- 持久化
- 消息发送到队列失败
- 1.配置文件开启消费确认
- 2.配置回调函数
- 3.测试
- 4.如何处理失败消息
- 消息消费失败
- 消息幂等性(重复消费)
- 问题
- 测试重复消费场景
- 解决
- 消息有序
- 消息消费顺序错乱原因
- 解决
- 消息堆积
- 消息堆积原因
- 解决
6种工作模式
1.直连模式
没有交换机,根据routing key直连队列
application.properties
server.port=8081
spring.rabbitmq.host=39.99.141.194
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.mvc.pathmatch.matching-strategy=ant-path-matcher
RabbitmqConfig
@Configuration
public class RabbitMqConfig {
//1.工作队列模式
//声明队列,同时交给spring
@Bean(name = "work-queue")
public Queue queue0(){
return new Queue("work-queue");
}
}
send
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void direct() {
rabbitTemplate.convertAndSend("direct", "这是直连模式");
}
}
consumer
@Controller
public class Consumer1 {
@RabbitListener(queues = "direct")
public void workQueue(String str){
System.out.println("当前监听到了:"+str);
}
}
控制台
2.发布订阅模式
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
RabbitmqConfig
//2.发布订阅模式
//声明了队列
@Bean(name = "queue1")
public Queue queue(){
return new Queue("publish-queue1");
}
@Bean(name = "queue2")
public Queue queue2(){
return new Queue("publish-queue2");
}
//广播的交换机
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("publish-exchange");
}
//将队列绑定到交换机
@Bean
Binding bindQueue1ToFanoutExchange(@Qualifier("queue1")Queue queue, FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
//将队列绑定到交换机
@Bean
Binding bindQueue2ToFanoutExchange(@Qualifier("queue2")Queue queue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
send
@Test
public void testSendPublish(){
Map map=new HashMap<>();
map.put("name","张三");
map.put("age",18);
//1.交换机的名称 2.你的规则,发布订阅模式为空 3.消息的主题
rabbitTemplate.convertAndSend("publish-exchange","",map);
}
consumer
@Controller
public class Consumer2 {
@RabbitListener(queues = "publish-queue1")
public void workQueue1(Map str1){
System.out.println("publish-queue1当前监听到了:"+str1);
}
@RabbitListener(queues = "publish-queue2")
public void workQueue2(Map str2){
System.out.println("publish-queue2当前监听到了:"+str2);
}
@RabbitListener(queues = "publish-queue3")
public void workQueue3(Map str3){
System.out.println("publish-queue3当前监听到了:"+str3);
}
}
控制台
3.Routing路由模式
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
RabbitMQConfig
//3.routing模式 -路由模式
//声明了3个队列
@Bean(name = "queue4")
public Queue queue4(){
return new Queue("routing-queue1");
}
@Bean(name = "queue5")
public Queue queue5(){
return new Queue("routing-queue2");
}
@Bean(name = "queue6")
public Queue queue6(){
return new Queue("routing-queue3");
}
//声明交换机,路由模式 DirectExchange
@Bean
public DirectExchange directExchange(){
return new DirectExchange("routing-exchange");
}
//建立队列与交换机的关系
@Bean
public Binding bindQueue1ToDirectExchange(@Qualifier("queue4")Queue queue,DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
@Bean
public Binding bindQueue2ToDirectExchange(@Qualifier("queue5")Queue queue,DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("waring");
}
@Bean
public Binding bindQueue3ToDirectExchange(@Qualifier("queue6")Queue queue,DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("error");
}
send
@Test
public void testRouting() {
//1.交换机的名称 2.你的规则,发布订阅模式为空 3.消息的主题
rabbitTemplate.convertAndSend("routing-exchange", "info", "这是info");
rabbitTemplate.convertAndSend("routing-exchange", "warning", "这是warning");
// rabbitTemplate.convertAndSend("routing-exchange", "error", "这是error");
}
consumer
@Controller
public class Consumer3 {
@RabbitListener(queues = "routing-queue1")
public void routing1(String string) {
System.out.println("routing-queue1接收到:" + string);
}
@RabbitListener(queues = "routing-queue2")
public void routing2(String string) {
System.out.println("routing-queue2接收到:" + string);
}
@RabbitListener(queues = "routing-queue3")
public void routing3(String string) {
System.out.println("routing-queue3接收到:" + string);
}
}
控制台
只有info和warning收到了
4.Topic通配符模式
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
RabbitMQConfig
//4.topic模式 -主题模式
//声明了3个队列
@Bean(name = "queue7")
public Queue queue7() {
return new Queue("topic-queue1");
}
@Bean(name = "queue8")
public Queue queue8() {
return new Queue("topic-queue2");
}
@Bean(name = "queue9")
public Queue queue9() {
return new Queue("topic-queue3");
}
//声明交换机,路由模式 DirectExchange
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange");
}
@Bean
public Binding bindQueue1ToTopicExchange(@Qualifier("queue7") Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("ex.123.123");
}
@Bean
public Binding bindQueue2ToTopicExchange(@Qualifier("queue8") Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("ex.*");
}
@Bean
public Binding bindQueue3ToTopicExchange(@Qualifier("queue9") Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("ex.#");
}
send
@Test
public void testTopic() {
//1.交换机的名称 2.你的规则,发布订阅模式为空 3.消息的主题
rabbitTemplate.convertAndSend("topic-exchange", "ex.123.123", "这是ex.123.123");
}
consumer
@Controller
public class Consumer4 {
@RabbitListener(queues = "topic-queue1")
public void routing1(String string) {
System.out.println("topic-queue1接收到:" + string);
}
@RabbitListener(queues = "topic-queue2")
public void routing2(String string) {
System.out.println("topic-queue2接收到:" + string);
}
@RabbitListener(queues = "topic-queue3")
public void routing3(String string) {
System.out.println("topic-queue3接收到:" + string);
}
}
控制台
队列1,3可以收到
5.Header模式
6.RPC
消息不丢失
消息发送到交换机失败
1.配置文件开启发布确认
spring.rabbitmq.publisher-confirm-type=correlated
2.配置回调函数
RabbitMQConfig:
@Slf4j
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("触发confirm回调,交换机接收到了");
} else {
log.info("触发confirm回调函数,交换机收不到信息,原因:" + cause);
log.info("消息对应的的CorrelationData id:" + correlationData.getId());
}
});
return rabbitTemplate;
}
}
3.测试
随便给交换机发送一条消息
4.如何处理失败消息
在ConfirmCallback监听中,当消息发送失败,ack失败时,我们又能拿到消息的CorrelationData,所以通过CorrelationData与消息之间的关系,我们在回调函数中通过CorrelationData来获取发送失败的消息,进而对其进行下一步操作(记录或重发等)
我们可以在发消息之前,将CorrelationData作为key,消息作为value,持久化起来(例如用redis数据库),当消息成功发送到交换机,ack为true时,我们再把他从持久化层中删除,这样的话,当消息发送失败时,我们就可以通过CorrelationData,从持久层中拿到发送失败的消息了
代码改造如下:
(CacheService是封装的redis操作工具类)
RabbitMQConfig:
@Slf4j
@Configuration
public class RabbitMQConfig {
@Autowired
private CacheService cacheService;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("消息主体 message : " + returnedMessage.getMessage());
log.info("消息主体 message : " + returnedMessage.getReplyCode());
log.info("描述:" + returnedMessage.getReplyText());
log.info("消息使用的交换器 exchange : " + returnedMessage.getExchange());
log.info("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());
});
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("触发confirm回调,交换机接收到了");
Long result = cacheService.hDelete("rabbitmq:" + correlationData.getId(), "exchange", "routingKey", "message");
log.info("已清除redis消息备份:"+result);
} else {
log.info("触发confirm回调函数,交换机收不到信息,原因:" + cause);
log.info("消息对应的的CorrelationData id:" + correlationData.getId());
Map<Object, Object> map = cacheService.hGetAll("rabbitmq:" + correlationData.getId());
String exchange = (String) map.get("exchange");
String routingKey = (String) map.get("routingKey");
String message = (String) map.get("message");
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
});
return rabbitTemplate;
}
send:
@Test
public void testConfirm() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String exchange = "routing-exchange";
String routingKey = "info";
String message = "这是info";
Map hashMap = new HashMap<>();
hashMap.put("exchange", exchange);
hashMap.put("routingKey", routingKey);
hashMap.put("message", message);
cacheService.hPutAll("rabbitmq:" + correlationData.getId(), hashMap);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
模拟一个失败情况(向不存在的exchange发消息),就会自动进行重试,redis中会保存下所有的失败消息,我们定时做人工处理也是可以的
控制台
RabbitMQ服务器故障
持久化
开启交换机,队列,消息的持久化,可将其存储在磁盘上,可在服务重启后恢复
-
交换机持久化
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("publish-exchange",true,false); }
-
队列持久化
@Bean("direct") public Queue queue0() { return new Queue("direct",true); }
-
消息持久化
//spring的rabbitTemplate默认开启消息持久化 rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
消息发送到队列失败
1.配置文件开启消费确认
spring.rabbitmq.publisher-returns=true
2.配置回调函数
//消息消费确认
//mandatory:交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式
//true:RabbitMQ会调用Basic.Return命令将消息返回给生产者
//false:RabbitMQ会把消息直接丢弃
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("消息主体 message : " + returnedMessage.getMessage());
log.info("消息主体 message : " + returnedMessage.getReplyCode());
log.info("描述:" + returnedMessage.getReplyText());
log.info("消息使用的交换器 exchange : " + returnedMessage.getExchange());
log.info("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());
});
3.测试
发送一个不存在的routingKey
4.如何处理失败消息
returnedMessage包含消息的所有信息,可以进行例如上面使用的redis进行持久化,再进行处理
消息消费失败
rabbitMQ有 ack 签收机制,简单来说就是三种模式:
AcknowledgeMode.NONE:默认推送的所有消息都已经消费成功,会不断地向消费端推送消息。所以推送出去的消息不会暂存在server端
AcknowledgeMode.AUTO: 由 spring-rabbit 依据消息处理逻辑是否抛出异常自动发送 ack(无异常)或 nack(异常)到 server 端。
AcknowledgeMode.MANUAL:模式需要人为地获取到 channel 之后调用方法向 server 发送 ack (或消费失败时的 nack )信息
消费结果 | 结果 | 批量操作 |
---|---|---|
ack | 表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除void basicAck(long deliveryTag, boolean multiple) | 允许 |
nack | 表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。void basicNack(long deliveryTag, boolean multiple, boolean requeue) | 允许 |
reject | 拒绝消息,与 basicNack 区别在于不能进行批量操作,其他用法很相似。void basicReject(long deliveryTag, boolean requeue) | 不允许 |
- deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag 都会递增。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:为了减少网络流量,手动确认可以被批处理,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
-
配置文件开启手动签收
#多消费者轮询模式,每个消费者都能收到的未被消费的最大消息数量 spring.rabbitmq.listener.simple.prefetch=1 #设置消费端手动,返回分为:ack(无异常),nack(存在异常),reject(存在异常) spring.rabbitmq.listener.simple.acknowledge-mode=manual #开启重试 spring.rabbitmq.listener.simple.retry.enabled=true
-
消费者开启手动签收,并手动抛出异常用于测试
@RabbitListener(queues = "routing-queue1") public void routing1(String string, Channel channel, Message message) throws IOException { try { System.out.println("routing-queue1接收到:" + string); throw new RuntimeException("手动抛出异常,测试"); } catch (Exception e) { log.error("消息消费出现异常,重新入队伍"); /** * 出现异常,把消息重新投递回队列中,如一直有异常会一直循环投递 * deliveryTag:表示消息投递序号。 * multiple:是否批量确认。 * requeue:值为 true 消息将重新入队列。 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); } /** * 消息确认 ACK * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加 * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。 */ log.info("ACK消息消费确认....."); // 消息确认 basicAck channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息拒绝 basicReject //channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); }
-
测试
send
@Test public void testConfirm() { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); String exchange = "routing-exchange"; String routingKey = "info"; String message = "这是info"; Map hashMap = new HashMap<>(); hashMap.put("exchange", exchange); hashMap.put("routingKey", routingKey); hashMap.put("message", message); cacheService.hPutAll("rabbitmq:" + correlationData.getId(), hashMap); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); }
会不停进行重新投递消费
消息幂等性(重复消费)
问题
保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,
为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
测试重复消费场景
发送5000条数据到queue,消费端自动应答
send
@Test
public void testAgain() {
String exchange = "routing-exchange";
String routingKey = "warning";
for (int i = 1; i <= 5000; i++) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String message = "这是warning:" + i;
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}
启动consumer,随后中断重启
@RabbitListener(queues = "routing-queue2")
public void routing2(String string, Channel channel, Message message) throws IOException {
System.out.println("routing-queue2接收到:" + string);
// 消息确认 basicAck
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
这个我测不出来,但理论上是可能出现问题的(QAQ)
解决
如何解决消息重复消费的问题:
为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下:
-
消费者监听到消息后获取id,先去查询这个id是否存中
-
如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)
-
如果存在则丢弃此消息
消费者改造,以消息id为key,消息内容为value存入setnx中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)
/**
* setnx,如果redis中有记录,就会返回false,说明已经消费过了,无法写入
* @param string
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = "routing-queue2")
public void routing2(String string, Channel channel, Message message) throws IOException, InterruptedException {
boolean b = cacheService.setIfAbsent("RabbitmqConsumer:"+message.getMessageProperties().getMessageId(), string, 10L, TimeUnit.MINUTES);
if (!b) {
return;
}
System.out.println("routing-queue2接收到:" + string);
}
测试,已经全部存入了redis
消息有序
消息消费顺序错乱原因
-
一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
-
一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
解决
在必须保证顺序消费的业务中,单个队列对应单个消费者,单线程消费
消息堆积
消息堆积原因
- 消息堆积即消息没及时被消费,是生产者生产消息速度快于消费者消费的速度导致的。
- 消费者消费慢可能是因为:本身逻辑耗费时间较长、阻塞了。
解决
-
增加消费者消费能力,消费者内开启多线程处理消息(注意无法保证顺序消费)
-
建立新的queue,消费者同时订阅新旧queue,采用订阅模式
-
默认情况下,rabbitmq消费者为单线程串行消费(org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer类的concurrentConsumers与txSize(对应prefetchCount)都是1),设置并发消费两个关键属性concurrentConsumers和prefetchCount。concurrentConsumers:设置的是对每个listener在初始化的时候设置的并发消费者的个数;prefetchCount:每次从broker里面取的待消费的消息的个数。
配置方法:修改application.properties:spring.rabbitmq.listener.concurrency=m spring.rabbitmq.listener.prefetch=n