RabbitMQ(2)、MQ的问题、消息可靠性

news2024/11/19 14:30:26

一、MQ的问题

基于上篇存在的问题

1. 问题说明

MQ在分布式项目中非常重要的,

它可以实现异步、削峰、解耦,但是在项目中引入MQ也会带来一系列的问题。

今天我们要解决以下几个常见的问题:

  • 消息可靠性问题:如何确保消息被成功送达消费者,并且被消费者成功消费掉

  • 延迟消息问题:如果一个消息,需要延迟15分钟再消费,像12306超时取消订单,如何实现消息的延迟投递

  • 消息堆积问题:如果消息无法被及时消费而堆积,如何解决百万级消息堆积的问题

  • MQ的高可用问题:如何避免MQ因为单点故障而不可用的问题

2. 准备代码环境

注意:为了后续的演示效果,暂不声明交换机、队列、绑定关系

创建project

  1. 删除project里的src文件夹

  2. 添加依赖坐标

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>


创建生产者模块

依赖

不需要添加,直接继承父工程的依赖

配置

修改application.yaml,添加配置:

spring:
  rabbitmq:
    host: 192.168.200.137
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321

引导类

package com.mqrebbit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class,args);
    }
}

发消息测试类

package com.mqrebbit;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class Demo01SimpleTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        rabbitTemplate.convertAndSend("demo.exchange","demo","hello");
    }
}

创建消费者模块

依赖

不需要添加,直接继承父工程的依赖

配置

修改application.yaml,添加配置:

spring:
  rabbitmq:
    host: 192.168.200.137
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321
    listener:
      simple:
        prefetch: 1

引导类

package com.mqrebbit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }
}

创建Listener

package com.mqrebbit.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DemoListener {
    @RabbitListener(queues = "demo.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("从{}队列接收到消息:{}", "demo.queue", msg);
        System.out.println("模拟:处理消息中……");
        log.info("消息处理完毕");
    }
}

二、消息可靠性

1. 介绍

当我们的生产者发送一条消息后,这条消息最终会到达消费者

那么在这整个过程中任何一个环境出错,都可能会导致消息的丢失,而导致不够可靠

可能出问题的环节有:

  • 生产者发送消息到Broker时 丢失:

    消息未送达Exchange

    消息到达了Exchange,但未到达Queue

  • Broker收到消息后丢失:

    MQ宕机,导致未持久化保存消息

  • 消费者从Broker接收消息丢失:

    消费者接收消息后,尚未消费就宕机

针对这些问题,RabbitMQ给出了对应的解决方案

  • 生产者发送消息丢失:使用生产者确认机制

  • Broker接收消息丢失:MQ消息持久化

  • 消费者接收消息丢失:消费者确认机制与失败重试机制  

2. 生产者确认机制

介绍

在了解生产者确认机制之前,

我们需要先明确一件事:生产者发送的消息,怎么样才算是发送成功了?

消息发送成功,有两个标准

  • 消息被成功送达Exchange

  • 消息被成功送达匹配的Queue

以上两个过程任何一步失败,都认为消息发送失败了。

生产者确认机制,可以确保生产者明确知道消息是否成功发出,如果未成功的话,是哪一步出现问题。然后开发人员就可以根据投递结果做进一步处理。

Confirm Callback机制:确定回收

说明

使用发送者的ConfirmCallback机制,

用于让生产者确认消息是否送达交换机

如果消息成功送达交换机,MQ会给生产者返回一个ack(确认)。

当生产者得到ack之后,就可以确定消息成功送达交换机了

使用步骤,在生产者一方做如下操作:

  1. 修改配置文件,指定 confirm确认的处理方式,使用异步方式 correlated:相关的

  2. 发送消息时,配置CorrelationData:对比数据,用于处理确认结果     

声明一个交换机:

 

 

示例

1. 修改配置文件

修改生产者一方的配置文件application.yaml,增加如下配置

  • 如果配置为simple,表示使用同步方式处理确认的结果

  • 如果配置为correlated,表示使用异步方式处理确认的结果,但是发送消息时需要我们准备一个CorrelationData对象,用于接收确认结果

spring:
  rabbitmq:
    #生产者确认机制类型。simple同步方式确认;correlated异步方式确认,将使用CorrelationData接收确认结果
    publisher-confirm-type: correlated

2.发送消息

package com.mqrebbit;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
public class Demo01SimpleTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test(){
        //创建一个CorrelationData Correlation:关联
        CorrelationData data = new CorrelationData();
        //设置消息的id
        data.setId("msg-001");
        //准备好 回调函数:Callback,用于接收 将来:Future 的确认结果
        data.getFuture().addCallback(
                //Success:成功
                /**
                 * 当消息发送没有出现异常时,这个方法会被调用
                 */
                result -> {
                    if (result.isAck()){
                        log.info("发送消息完成,且已经到达交换机。消息id={}",data.getId());
                    }else {
                        log.info("发送消息完成,但是没有到达交换机。消息id={},原因={}",data.getId(),result.getReason());

                    }                },
                //Failure:失败
                /**
                 * 当消息发送出现异常时,这个方法会被调用
                 */
                ex -> {
                    log.warn("发送消息出现异常,消息id={}",data.getId(),ex);
                }
        );
        rabbitTemplate.convertAndSend("demo.exchange", "demo", "hello",data);
    }
}

3.测试结果-未送达交换机的结果

首先,我们先要保证 demo.exchange交换机不存在,再运行单元测试方法,发送消息。可看到如下结果

4.测试结果-成功送达交换机

然后,我们再创建配置类,声明一个名称为demo.exchange的交换机

package com.mqrebbit.config;

import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitBindingConfig {
    //创建一个交换机
    @Bean
    public TopicExchange demoExchange(){
        return ExchangeBuilder.topicExchange("demo.exchange").build();
    }
}

然后重新发送消息,可看到如下结果:


Return Callback机制

说明

使用生产者的Confirm Callback机制,可以确保消息成功送达交换机

但是消息是否被送达队列呢?我们同样需要进行确认。

为了解决这个问题,RabbitMQ提供了Return Callback机制:

  • 如果消息被交换机成功路由到队列,一切正常

  • 如果消息路由到队列时失败了,Return回调会把消息回退给生产者。生产者可以自行决定后续要如何处理

使用步骤,在生产者一方操作:

  1. 修改配置文件,开启return callback机制,并设置强制return back 

  2. 创建配置类,预先设置Return回调函数


 

 

 

 示例

1. 修改配置文件

修改生产者的配置文件application.yaml,开启return回调机制

spring:
  rabbitmq:
    publisher-returns: true #开启生产者return回调机制
    template:
      mandatory: true #开启强制回调。如果为true,消息路由失败时会调用ReturnCallback回退消息;如果为false,消息路由失败时会丢弃消息

设置Return回调

当消息未被路由到Queue时,Return回调会执行

注意:

  • 只要给RabbitTemplate对象设置一次回调函数即可,并不需要每次发送消息都设置Return回调。所以我们在配置类里给RabbitTemplate设置一次即可

  • 给单例的RabbitTemplate对象设置Return回调的方式有多种,使用哪种都行,只要能够设置成功即可

创建一个配置类,在配置类里设置Return回调函数:

package com.mqrebbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/*
 * ApplicationContext是: spring容器,所有bean对象都在这里面
 * ApplicationContextAware接口:当ApplicationContext初始化完成之后,
 * 接口的setApplicationContext方法将会被自动调用
 */
@Slf4j
@Configuration
public class RabbitConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //当交换机把消息路由到队列出现问题时,这个方法会自动执行
            log.warn("把消息路由到队列失败,replyCode{},replyText{},RoutingKey={},msg={}",
                    replyCode,replyText,exchange,routingKey, message);
        });
    }
}

测试结果-未路由到队列

首先,我们要先保证交换机没有绑定队列demo.queue,再运行单元测试方法,发送消息,可看到如下结果:

测试结果-成功路由到队列

然后,我们再找到RabbitBindingConfig配置类

增加一个队列demo.queue并绑定给交换机demo.exchange,最终代码如下:

@Configuration
public class RabbitBindingConfig {
    //创建一个交换机
    @Bean
    public TopicExchange demoExchange(){
        return ExchangeBuilder.topicExchange("demo.exchange").build();
    }
    @Bean
    public Queue demoQueue(){
        return QueueBuilder.durable("demo.queue").build();
    }
    @Bean
    public Binding demoBinding(Queue demoQueue,TopicExchange demoExchange){
        return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo");
    }
}

然后再发送消息,不报错,就说明路由成功了。可以去RabbitMQ控制台上查看消息

 


3. MQ消息持久化

 介绍

通过生产者确认机制,我们可以把消息投递到队列中。但是如果这时候MQ宕机了,队列里的消息同样有可能会丢失。这是因为:

  • 交换机可能是非持久化的。MQ一重启,交换机就消失了

  • 队列可能是非持久化的。MQ一重启,队列就消失了

  • 消息可能是非持久化的(在RabbitMQ内存中)。MQ一重启,消息就丢失了

所以我们必须要保证:交换机、队列、消息都是持久化的。

但实际上,我们创建交换机、队列、消息的方式都是持久化创建的所以以下内容我们仅仅了解即可

 交换机持久化

@Bean
public TopicExchange demoTopicExchange(){
    return ExchangeBuilder
            .topicExchange("demo.exchange")
        	//设置交换机为持久化的,重启也不消息。
        	//但其实可以不设置,因为交换机默认就是持久化的
            .durable(true)
            .build();
}

 队列持久化

@Bean
public Queue demoQueue(){
    return QueueBuilder
            //使用durable("队列名称")方法  创建的就是持久化队列
            .durable("demo.queue")
            .build();
}

 消息持久化

Message message = MessageBuilder
        .withBody("hello".getBytes())
        //设置为持久化消息
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        .build();
rabbitTemplate.convertAndSend("demo.exchange", "demo", message, data);




​4. 消费者确认机制

介绍

 

RabbitMQ采用的是阅后即焚模式,即只要消息被消费成功获取,MQ就会立刻删除掉这条消息。所以,我们必须保证,消息确实成功的被消费掉了。为此,RabbitMQ也提供了ack确认机制:

  • RabbitMQ将消息投递给消费者

    • 消费者成功处理消息

    • 消费者向RabbitMQ返回ack确认

  • RabbitMQ收到ack确认,删除消息

从上述过程中我们可以得到,消费者返回ack的时机是非常关键的:如果消费者仅仅是得到消息还未处理,就给RabbitMQ返回ack,然后消费者宕机了,就会导致消息丢失。

SpringAMQP允许消费者使用以下三种ack模式:

  • manual:手动ack。由开发人员在处理完业务后,手动调用API,向RabbitMQ返回ack确认

  • auto:自动ack【默认】。当消费者方法正常执行完毕后,由Spring自动给RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回nack(未消费成功)

  • none:关闭ack。MQ假定所有消息都会被成功消费,因为RabbitMQ投递消息后会立即删除

我们一般使用默认的auto模式


none模式

修改配置文件

修改消费者一方的配置文件application.yaml,设置消费者确认模式为none。添加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none #设置 消费者确认模式为none


auto模式

修改消费者

修改消费者Listener代码,模拟处理消息出现异常的情况

@Slf4j
@Component
public class DemoListener {

    @RabbitListener(queues = "demo.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("从{}队列接收到消息:{}", "simple.queue", msg);

        //模拟:处理消息中出现了异常
        int i = 1/0;

        log.info("消息处理完毕");
    }
}

测试效果

  1. 启动消费者服务

  2. 运行生产者单元测试类,发送消息

  3. 查看消费者的运行日志控制台

去RabbitMQ控制台(http://192.168.200.137:15672)查看队列里的消息,发现队列里没有消息。消息还没

auto模式

修改配置文件

修改消费者一方的配置文件application.yaml,设置消费者确认模式为auto

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto #设置 消费者确认模式为auto

修改消费者

代码和刚刚‘none’模式的代码相同,并没有调整。仍然是:模拟处理消息过程中出错

@Slf4j
@Component
public class DemoListener {

    @RabbitListener(queues = "demo.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("从{}队列接收到消息:{}", "simple.queue", msg);

        //模拟:处理消息中出现了异常
        int i = 1/0;

        log.info("消息处理完毕");
    }
}

测试效果

  1. 重启消费者服务

  2. 运行生产者的单元测试方法,发送消息

  3. 查看消费者的运行日志控制台,发现程序在不停的报错。这是因为

    RabbitMQ在投递消息之后,消费者收到消息后抛了异常,导致没有给RabbitMQ返回ack确认

    RabbitMQ尝试重新投递消息,消费者收到消息后又抛了异常……


5. 消费者失败重试

介绍

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

我们可以利用Spring本身的retry机制,在消费者出现异常后,在消费者内部进行本地重试;而不是让消息重新入队列,然后让RabbitMQ重新投递。  

消费者本地重试

只要修改消费者一方的配置文件,设置消费者本地重试,并配置重试参数

修改消费者一方的配置文件application.yaml,增加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true  #开始 消费者本地的失败重试
          initial-interval: 1000 #初始的失败等待时长,单位是ms,默认1000
          multiplier: 1 #与上次重试间隔时长的倍数(1表示每次重试的时间间隔相同)。默认1
          max-interval: 10000 #最大重试的间隔,默认值10000 
          max-attempts: 3 #最多重试几次。默认3
          stateless: true #是否无状态。默认true。如果涉及事务,要改成false

 重启消费者服务后,发现:

  1. 消费者重复获取了3次消息,在3次尝试中并没有抛出异常

  2. 在3次尝试都失败后,才抛出了RejectAndDontRequeueRecoverer异常

然后再去RabbitMQ控制台(http://192.168.200.137:15672),从队列里查看消息,发现消息已经被删除了


失败后的恢复策略

在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。

但是,其实在重试多次消费仍然失败后,SpringAMQP提供了MessageRecoverer接口,定义了不同的恢复策略可以用来进一步处理消息:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息。是默认的处理策略

  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,立即重新入队requeue

  • RepublishMessageRecoverer:重试次数耗尽后,将失败消息投递到指定的交换机

实际开发中,比较优雅的一个方案是RepublishMessageRecoverer,将失败消息重新投递到一个专门用于存储异常消息的队列中,等待后续人工处理。

使用步骤:

  1. 声明消息的恢复策略

  2. 声明交换机、队列、绑定关系

声明消息恢复策略

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//使用Topic实现
@Configuration
public class RepublishMessageRecovererConfig {

    /*
     * 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略
     */
    @Bean
    public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }


    @Bean
    public TopicExchange errorExchange() {
        return ExchangeBuilder.topicExchange("error.exchange").build();
    }

    @Bean
    public Queue errorQueue() {
        return QueueBuilder.durable("error.queue").build();
    }

    @Bean
    public Binding errorQueueBinding(TopicExchange errorExchange, Queue errorQueue) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.#");
    }
}
//使用 Direct(Routing)实现 
@Configuration
public class RepublishMessageRecovererConfig {

    /*
     * 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略
     */
    @Bean
    public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }


    @Bean
    public DirectExchange errorExchange(){
        return ExchangeBuilder.directExchange("error.exchange").build();
    }

    @Bean
    public Queue errorQueue(){
        return QueueBuilder.durable("error.queue").build();
    }
    @Bean
    public Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }
}

测试效果

  1. 消费者收到消息,模拟报错。耗尽重试次数

  2. 打开RabbitMQ控制台(http://192.168.200.137:15672),查看错误队列里的消息


6. 小结

1. 确认消息是否送达交换机:使用Confirm Callback机制
    修改生产者的配置文件,设置 confirm确认的类型,使用correlated
    在RabbitTemplate发送消息之前,预先准备一个CorrelationData对象,设置好消息id、

    设置Confirm回调函数
        发送消息时,把CorrelationData对象设置到convertAndSend方法里
2. 确认消息是否送达队列:使用Return Callback机制
    修改生产者的配置文件,开启return机制,设置 如果消息路由失败,就强制回退消息
    给Spring容器里唯一的RabbitTemplate设置ReturnCallback回调函数:当消息路由失败

    后,会执行的回调函数
3. 确定Broker是持久化的,包括:
    交换机:需要是持久化的
    队列:需要是持久化的
    消息:需要是持久化的
4. 消费者确认机制:确认消费成功了,才让MQ删除消息
    none:不确认。MQ只要投递消息,就直接删除消息。无论是否消费成功
    manual:手动确认。需要消费者手动调用API给MQ发送ack确认,MQ到ack之后删除消息
    auto:自动确认。如果消费者方法执行中没有异常,就自动给MQ发送ack确认,MQ删除

               消息
              如果消费者方法执行中出现异常,默认 会把消息重新入队requeue,重新投递给

              消费者,死循环……

         
5. 设置消费者本地重试
    当消费者消费失败后,不要重新入队requeue,而在消费者服务本地进行重试
    当重试次数耗尽之后,默认会丢弃消息
    使用方式:
        修改消费者的配置文件,设置retry为true
6. 如果消费者本地重试次数耗尽之后,有最终的恢复策略(回收策略)
    RejectAndDontRequeueRecoverer:向MQ发送一个reject通知,丢弃消息
    ImmediateRequeueMessageRecoverer:消息重新入队列Queue
    RepublishMessageRecoverer:把消息投递给另外一个指定的交换机,重新路由到新的队

    列里,后续可以人工干预再处理

三、死信交换机

1. 介绍

什么是死信

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

       拒绝消息不再重新入队

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

默认情况下,死信会直接丢弃

死信交换机    

死信交换机就是普通交换机

 

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信;因为demo.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机;如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

注意:

  • 死信交换机,其实是普通交换机。只是用于处理死信,所以称为死信交换机

  • 死信队列,其实也是普通队列。只是用于处理死信,所以称为死信队列

      死信交换机都与队列有关系

2. 消费失败成为死信【拓展】

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给demo.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

取消恢复策略

消费者服务中RabbitMsgRecovererConfig配置类里的恢复策略注释掉,则SpringAMQP将会使用默认的RejectAndDontRequeueRecoverer策略,在本地重试次数耗尽后,发送reject给RabbitMQ


配置死信交换机

 修改生产者的配置类,在声明队列时绑定死信交换机

 

1. 声明死信交换机

 2.声明队列

 

@Configuration
public class RabbitBindingConfig {

    @Bean//声明一个交换机
    public TopicExchange demoExchange() {
        return ExchangeBuilder.topicExchange("demo.exchange").build();
    }

    @Bean //声明一个队列
    public Queue demoQueue() {
        return QueueBuilder.durable("demo.queue")
                //给队列绑定死信交换机,名称为dl.exchange
                .deadLetterExchange("dl.exchange")
                //如果队列信息成为死信,在投递给死信交换机的时候,需要携带的RoutingKey为dl
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding demoBinding(Queue demoQueue, TopicExchange demoExchange) {
        return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo");
    }

    //-------------------------死信交换机、死信队列、绑定关系---------------------------------------
    @Bean
    public DirectExchange dlExchange() {
        return ExchangeBuilder.directExchange("dl.exchange").build();
    }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder.durable("dl.queue").build();
    }

    @Bean
    public Binding dlBinding(DirectExchange dlExchange, Queue dlQueue) {
        return BindingBuilder.bind(dlQueue).to(dlExchange).with("dl");
    }
}

测试效果

  1. 先去RabbitMQ控制台页面(http://192.168.200.137:15672)中,demo.queue队列删除掉。因为之前声明的队列并没有绑定死信交换机,必须要删除掉,重新声明才行

  2. 运行生产者的单元测试方法,发送消息

  3. 启动消费者服务,开始从demo.queue中接收消息但出现异常;在耗尽重试次数后,因为恢复策略是默认的RejectAndDontRequeueRecoverer成为死信。消息被投递到死信交换机,然后路由到死信队列

  4. 在RabbitMQ控制台中查看死信队列dl.queue,可看到死信队列中有一条消息

 

3. 消息超时成为死信

说明

如果一条消息超时未被消费,也会成为死信。而超时有两种方式:

  • 消息所在的队列设置了超时

  • 消息本身设置了超时

我们将按照如下设计,演示超时成为死信的效果:

 


队列TTL示例

注意:为了方便演示死信队列的效果,我们将创建一个新的project,准备新的代码环境。参考第一章节中准备的代码环境。

生产者

声明队列和交换机

  • 声明死信交换机与死信队列,并绑定

  • 声明普通交换机与普通队列,并绑定。注意,声明普通队列时要:

    设置队列的TTL

    队列设置死信交换机与死信的RoutingKey

 

 

 

@Configuration
public class RabbitBindingConfig {

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                //设置队列的超时时间为10s
                .ttl(10*1000)
                //给队列设置死信交换机,名称为dl.ttl.exchange;设置投递死信时的RoutingKey为ttl
                .deadLetterExchange("dl.ttl.exchange").deadLetterRoutingKey("ttl")
                .build();
    }

    @Bean
    public DirectExchange ttlExchange(){
        return ExchangeBuilder.directExchange("ttl.exchange").build();
    }

    @Bean
    public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("demo");
    }

    //--------------------死信交换机、死信队列、死信绑定关系------------------------------

    @Bean
    public DirectExchange dlTtlExchange(){
        return ExchangeBuilder.directExchange("dl.ttl.exchange").build();
    }

    @Bean
    public Queue dlTtlQueue(){
        return QueueBuilder.durable("dl.ttl.queue").build();
    }

    @Bean
    public Binding tlTtlBinding(Queue dlTtlQueue, DirectExchange dlTtlExchange){
        return BindingBuilder.bind(dlTtlQueue).to(dlTtlExchange).with("ttl");
    }
}

发送消息

注意:声明队列时已经给队列设置了TTL,所以发送消息时不需要给消息设置TTL

@Slf4j
@SpringBootTest
public class DemoMessageTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend("ttl.exchange", "demo", "demo dead letter,发送时间是:" + LocalTime.now());
    }
}

消费者

@Slf4j
@Component
public class DemoListener {

    /**
     * 监听死信队列
     */
    @RabbitListener(queues = "dl.ttl.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "dl.ttl.queue", msg);
    }
}

测试效果

  1. 运行生产者的单元测试代码,发送一条消息

  2. 启动消费者服务,等待接收消息。发现消费者在10s后收到了消息


消息TTL示例

 

 

 生产者

声明队列和交换机

可以直接使用刚刚的“队列TTL示例”中的配置,与之相比,仅仅是声明队列时不再设置队列的TTL。代码如下:

@Configuration
public class RabbitBindingConfig {

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                //给队列设置死信交换机,名称为dl.ttl.exchange;设置投递死信时的RoutingKey为ttl
                .deadLetterExchange("dl.ttl.exchange").deadLetterRoutingKey("ttl")
                .build();
    }

    @Bean
    public DirectExchange ttlExchange(){
        return ExchangeBuilder.directExchange("ttl.exchange").build();
    }

    @Bean
    public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("demo");
    }

    //--------------------死信交换机、死信队列、死信绑定关系------------------------------

    @Bean
    public DirectExchange dlTtlExchange(){
        return ExchangeBuilder.directExchange("dl.ttl.exchange").build();
    }

    @Bean
    public Queue dlTtlQueue(){
        return QueueBuilder.durable("dl.ttl.queue").build();
    }

    @Bean
    public Binding tlTtlBinding(Queue dlTtlQueue, DirectExchange dlTtlExchange){
        return BindingBuilder.bind(dlTtlQueue).to(dlTtlExchange).with("ttl");
    }
}

发送消息

发送消息时设置消息的TTL

@Slf4j
@SpringBootTest
public class DemoMessageTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        String msgStr = "消息TTL demo,发送时间是:" + LocalTime.now();
        
        Message message = MessageBuilder
                .withBody(msgStr.getBytes())
                //设置消息TTL为5000毫秒
                .setExpiration("5000")
                .build();

        //发送消息时:
        //  如果消息和队列都设置了TTL,则哪个TTL短,哪个生效
        //  如果消息和队列只设置了一个TTL,则直接以设置的为准
        rabbitTemplate.convertAndSend("ttl.exchange", "demo", message);
    }
}

消费者

直接使用刚刚“队列TTL示例”中的消费者代码即可。代码如下:

@Slf4j
@Component
public class DemoListener {

    /**
     * 监听死信队列
     */
    @RabbitListener(queues = "dl.ttl.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "dl.ttl.queue", msg);
    }
}

测试效果

  1. 运行生产者的单元测试代码,发送消息

  2. 启动消费者服务,开始监听消息。发现消费者在5s后收到了消息



4. 延迟队列

4.1 介绍

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。延迟队列的使用场景非常多,例如:

  • 用户下单,如果用户在15 分钟内未支付,则自动取消订单

  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟消息的需求非常多,所以RabbitMQ官方也推出了一个延迟队列插件,原生支持延迟消息功能。插件名称是:rabbitmq_delayed_message_exchange

官网插件列表地址:Community Plugins — RabbitMQ

4.2 安装

大家可以去对应的GitHub页面下载3.8.9版本的插件,这个版本的插件对应RabbitMQ3.8.5以上版本:Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

也可以直接使用资料中提供好的插件。

安装步骤如下:

  1. 查看mq的数据卷目录

    因为我们的MQ是使用docker安装的,而创建mq容器时挂载了名称为mp-plugins的数据卷。

    我们要先查看一下数据卷的位置,执行命令:docker volumes inspect mq-plugins

    可知数据卷的目录在/var/lib/docker/volumes/mq-plugins/_data

 

  1. 使用finalshell或其它工具,把插件上传到虚拟机的/var/lib/docker/volumes/mq-plugins/_data目录中

  2. 进入mq容器内部:docker exec -it mq /bin/bash

  3. 在容器内执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

 

4.3 原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息

  • 判断消息是否具备x-delay属性

  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间

  • 返回routing not found结果给消息发送者

  • x-delay时间到期后,重新投递消息到指定队列

4.4 使用示例

插件的使用步骤也非常简单:

  • 声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true,然后声明队列与其绑定即可。

  • 发送消息时,指定一个消息头 x-delay,值是延迟的毫秒值

声明队列和交换机

@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder.directExchange("delay.direct.exchange")
            //设置为“延迟交换机”
            .delayed()
            .build();
}
@Bean
public Queue delayQueue(){
    return QueueBuilder.durable("delay.queue").build();
}
@Bean
public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange){
    return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay");
}

也可以使用注解方式声明,示例代码:  

@RabbitListener(bindings = @QueueBinding(
     value = @Queue("delay.queue"),
     exchange = @Exchange(value = "delay.direct.exchange", type = ExchangeTypes.DIRECT),
     key = "delay"
))
public void handleDelayQueueMsg(String msg){
 log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "delay.queue", msg);
}

发送消息

发送消息时,必须指定x-delay头,设置延迟时间

@Test
public void test2(){
    String msgStr = "这是一条延迟消息,发送时间是:" + LocalTime.now();
    Message message = MessageBuilder
            .withBody(msgStr.getBytes())
            .setHeader("x-delay", 10000)
            .build();
    rabbitTemplate.convertAndSend("delay.direct.exchange", "delay", message);
}

 

监听消息

@RabbitListener(queues="delay.queue")
public void handleDelayQueueMsg(String msg){
    log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "delay.queue", msg);
}

5. 小结

1. 一条消息如何成为死信
    消息被消费失败(重试次数耗尽,并且不再重新入队),成为死信
    消息超时成为死信(无论是队列的ttl,还是消息ttl)
    队列满了成为死信
    
    MQ默认处理死信的方式是:丢弃消息
    给队列绑定死信交换机,可以把死信重新投递到新的队列里

2. 消息超时成为死信:
    给队列设置ttl:在声明队列的时候, 
        QueueBuilder.durable("队列名称")
            .ttl(毫秒值)
            .deadLetterExchange("死信交换机名称")
            .deadLetterRoutingKey("投递到死信交换机时要携带的RoutingKey")
            .build();
    给消息设置ttl:使用MessageBuilder创建消息
        MessageBuilder.withBody(消息的字节数组).setExperation("毫秒值").build();

3. 延迟消息插件的使用
    声明队列:和之前一样
    声明交换机:ExchangeBuilder.topicExchange("xxx").delayed().build();
    绑定关系:和之前一样
    
    发送消息:构造消息对象时,需要指定一个头
        MessageBuilder.withBody("消息内容字节数组").setHeader("x-delay", 毫秒值).build();

四、惰性队列

五、MQ集群

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/569098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL — SQL 优化

文章目录 SQL 优化一、插入数据二、主键优化2.1 数据组织方式2.2 页分裂2.3 页合并2.4 主键设计原则 三、 Order by 优化3.0 排序方式讲解3.1 升序/降序联合索引结构图示3.2 总结 四、Group by优化五、limit优化六、 count优化七、update优化七、update优化 SQL 优化 一、插入…

【MySQL 数据库】5、存储引擎

目录 一、MySQL 体系结构二、存储引擎简介三、InnoDB 存储引擎四、MyISAM五、Memory六、三大存储引擎比较七、存储引擎的选择 一、MySQL 体系结构 连接层 最上层是一些客户端和链接服务&#xff0c;包含本地sock 通信和大多数基于客户端/服务端工具实现的类似于TCP/IP的通信。主…

07:MYSQL----多表查询

目录 1:多表查询概述 2:多表查询分类 3:内连接 3:外连接 4:自连接 5:联合查询-union&#xff0c;union all 6:子查询 1:多表查询概述 select * from emp , dept; emp:表中有6条数据, dept表中有5条数据只查询出来的数据为:30条 概述:指从多张表中查询数据 笛卡尔积…

在vite或者vue-cli中使用.env[mode]环境变量

在项目中总会遇到一些默认的配置,需要我们配置到静态文件中方便我们去获取,这时候就可以用到这个.env环境变量文件,在cli创建的项目中顶层的nodejs会有一个process对象,这个对象可以根据不同的环境获取不同的环境配置文件,但是vite中获取变量的方式不一样。 创建变量文件.env.…

如何编写接口自动化框架系列之requests详解(三)

目录 1.http协议 2.requests介绍 3.requests的主要功能 3.requests的主要功能 3.1 场景1-常用方法 3.2 场景2-通用方法 3.3 场景3-cookies认证方式 4.requests 在项目中的实践 4.1 在接口层实现一个接口 4.2 在测试用例层调用 4.3 项目总结 本文是接口自动化测试框架…

IOC初始化 IOC启动阶段 (Spring容器的启动流程)

[toc](IOC初始化 IOC启动阶段 (Spring容器的启动流程)) IOC初始化 IOC启动阶段 (Spring容器的启动流程) Resource定位过程&#xff1a;这个过程是指定位BeanDefinition的资源&#xff0c;也就是配置文件&#xff08;如xml&#xff09;的位置&#xff0c;并将其封装成Resource对…

Makefile基础教程(make的隐式规则)

文章目录 前言一、什么是make的隐式规则二、makefile中出现同名目标时三、一些常见的隐式规则四、查看隐式规则五、隐式规则缺点六、禁用隐式规则1.全局禁用2.局部禁用 总结 前言 本篇文章将给大家介绍make的隐式规则。 一、什么是make的隐式规则 Make 的隐式规则是指 Make …

css选择器及其权重

1. 类型选择器 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…

【ZYNQ】裸机 PS + PL 双网口实现之 ZYNQ 配置

目前&#xff0c;在 ZYNQ 中进行以太网开发的方案&#xff0c;大部分都是基于通过 PS 的 MIO 以 RGMII 接口连接外部 PHY 芯片的方式。但是&#xff0c;由于使用 PS 的 MIO 只能以 RGMII 接口连接外部 PHY 芯片&#xff0c;这就限制了支持其他接口 PHY 芯片的使用&#xff0c;如…

分文件实现温湿度数据管理系统项目

目标&#xff1a; 了解分文件的概念&#xff0c;要依次从C语言的函数声明、变量的存储类别、C语言编译预处理&#xff0c;说起。这些知识点我们之前或多或少接触过&#xff0c;这里做个总结与拓展。经过总结&#xff0c;最后我们归纳出一个实现C语言模块化编程的技巧&#xff…

03-bootstrap-响应式布局-栅格系统

一、概述 1、栅格系统是 Bootstrap 中响应式布局的重要组成部分&#xff0c;旨在实现页面元素的自适应排版。Bootstrap 栅格系统将屏幕宽度分为 12 列&#xff0c;通过在 HTML 元素上添加相应的类名&#xff0c;可以让元素占据指定数量的列数&#xff0c;从而实现灵活的布局效…

5种易实现的Linux和 Windows VPS速度提升方法

​  无论是Linux VPS还是Windows VPS&#xff0c;网站速度的提高都是非常重要的。它们在提高网站速度方面都有很多的优化方法。下面我们将介绍 5 种提高网站速度的方法。 1.通过缓存加速 缓存通常是用来加快商业网站加载时间的技术&#xff0c;因此它也可以用在 VPS 上。没有…

车架号查车辆信息-vin查车辆信息api接口

接口地址&#xff1a; https://登录后显示/pyi/88/264(支持:http/https)) 在线查询&#xff1a;https://www.wapi.cn/car_vin.html 网站地址&#xff1a;https://www.wapi.cn 返回格式&#xff1a;json,xml 请求方式&#xff1a;GET,POST 请求说明&#xff1a; Md5验证方式-…

字符串、字符串列表,倒序生成字典。

带数字的字符串以数字为key倒序生成字典&#xff0c;字符串列表按其元素索引为key倒序生成字典。 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是基础那么简…

【MySQL】-- 表的操作

目录 表的操作 创建表 创建表案例 查看表结构 查看表结构案例 查看历史上表的创建语句 修改表 修改表实例 新增列属性 修改列属性 删除列属性 修改列名 修改表名 删除表 表的操作 创建表 语法&#xff1a; CREATE TABLE (if not exists) table_name (fie…

【MyBatisPlus框架】

文章目录 MyBatisPlus1.概述1.1 简介1.2特性1.3支持数据库1.4框架结构 2.入门案例2.1 创建数据库以及表2.2 创建工程2.2.1引入依赖 2.3编写代码 3.基本CRUD3.1BaseMapper3.2插入3.3删除3.4修改3.5查询3.6通用Service 4.常用注解4.1TableName4.1.1问题4.1.2通过TableName解决上述…

简述springmvc的流程

4、SpringMVC的执行流程 用户向服务器发送请求&#xff0c;请求被SpringMVC 前端控制器 DispatcherServlet捕获。 DispatcherServlet对请求URL进行解析&#xff0c;得到请求资源标识符&#xff08;URI&#xff09;&#xff0c;判断请求URI对应的映射&#xff1a; a) 不存在 …

day05 java_Spring IoC 和 DI

为什么使用spring框架 1.解耦代码(每次使用都要new一个对象) 2.解决事务繁琐问题(创建对象----初始化----调用方法销毁对象) 3.使用第三方框架麻烦的问题 总结:spring是一个轻量级的Ioc,Di和AOP容器 轻量级:简洁,高效,低依赖 **容器:**创建对象并将对象存储对象,同时管理…

高矿化度矿井水深度除氟装置CH-87技术解析

高矿化度矿井水是指含有高浓度溶解性矿物质的废水&#xff0c;通常指的是含有高浓度钠、钙、镁、铁、铝、钾等离子的废水。这些离子通常来自于废水所处的环境、工业或生产过程中使用的原材料和化学品。高矿化度的废水通常具有高盐度、高电导率、高硬度等特征&#xff0c;对环境…

性能测试计划不会写?我告诉你有模板你看不看

目录 1 简介 2 测试进入条件 3 测试退出条件 4 性能测试需求 5 测试风险 6 测试时机 7 测试策略 8 测试资源 9 测试进度 10 交付物 1 简介 1.1 目的 【描述性能测试计划的目的。】 1.2 背景 【描述项目或产品的背景。】 1.3范围 【描述性能测试的整体范围。】 2 测试进入条件 【…