Spring boot框架下的RabbitMQ消息中间件

news2025/1/18 20:10:42

1. RabbitMQ 基础概念

1.1 消息处理流程与组件配合

  1. Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。
  2. Exchange(交换机) 接收到消息后,根据 Routing Key(路由键)Binding(绑定规则),决定将消息发送到哪些 Queue(队列)
  3. Queue(队列) 存储消息,等待 Consumer(消费者) 消费。
  4. Consumer(消费者) 从队列中接收并处理消息。

Producer(生产者)

作用:负责发送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。

关键点

  • Producer 只需要知道 Exchange 和 Routing Key,不关心队列。
  • Producer 不直接与队列交互,消息的路由和存储由 Exchange 和 Binding 决定。

代码示例

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("Sent message: " + message);
    }
}

调用示例

producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
  • direct-exchange:目标交换机。
  • key1:消息的路由键。

Exchange(交换机)

作用:接收来自 Producer 的消息,并根据 Routing Key 和 Binding 的配置,决定将消息发送到哪些队列。

Exchange 通常需要手动注册为 Bean。

  • RabbitMQ 的 Exchange 是通过名称来标识的。

  • 在 Spring Boot 中,您通过 @Bean 方法注册 Exchange 时,实际上是将 Exchange 的名称和类型绑定到 RabbitMQ 服务器。

  • 发送消息时,RabbitMQ 客户端会根据 Exchange 的名称找到对应的 Exchange,并根据 Routing Key 将消息路由到队列。

类型

  • Direct Exchange:精确匹配 Routing Key。消息的 Routing Key 必须与 Binding 的 Routing Key 完全一致。

  • Topic Exchange:支持通配符匹配。例如,with("key.*") 可以匹配 key.1key.2 等。

  • Fanout Exchange:忽略 Routing Key,消息会被广播到所有绑定的队列。

  • Headers Exchange:忽略 Routing Key,根据消息头属性匹配。

代码示例(定义交换机):

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ExchangeConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct-exchange");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers-exchange");
    }
}

Queue(队列)

作用:消息的存储容器,等待消费者从中取出消息进行处理。

Queue 也需要手动注册为 Bean。Spring Boot 不会自动注册队列,因为队列的名称和属性(如是否持久化、是否排他等)需要根据业务需求进行配置。

关键点

  • 消息会保存在队列中,直到被消费。
  • 队列可以是持久化的(重启 RabbitMQ 后消息仍然存在)或非持久化的。

代码示例(定义队列):

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

    @Bean
    public Queue demoQueue() {
        return new Queue("demo-queue", true); // 持久化队列
    }
}

Routing Key(路由键)

作用:决定消息如何从交换机路由到队列。

关键点

  • Routing Key 由 Producer 指定。
  • 在 Direct 和 Topic 类型的 Exchange 中,Routing Key 决定队列是否接收消息。

Binding(绑定)

  • 作用:将队列与交换机连接,并定义路由规则。
  • 关键点
    • Binding 定义了队列接受消息的条件。
    • 结合 Routing Key 和交换机类型,共同决定消息的路由方式。

代码示例(定义绑定):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BindingConfig {

    @Bean
    public Binding binding(Queue demoQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");
    }
}

with("key1") 的作用是 指定 Binding 的 Routing Key。它的含义是:

  • 当消息发送到 Exchange 时,Exchange 会根据消息的 Routing Key 和 Binding 的 Routing Key 进行匹配。

  • 如果匹配成功,消息会被路由到对应的队列;如果匹配失败,消息会被丢弃或进入死信队列(如果有配置)。


Consumer(消费者)

作用:从队列中接收并处理消息。

关键点

  • 消费者与队列直接关联。
  • 多个消费者可以监听同一队列,实现负载均衡。

代码示例

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class Consumer {

    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

1.2 RabbitMQ 消息传输模型

点对点模型

定义:消息从生产者发送到队列,由消费者从队列中接收,消息只能被一个消费者消费。

实现

  • 使用默认交换机(空字符串 "")。
  • 直接将消息发送到队列。

代码示例

rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");

发布订阅模型

定义:生产者将消息发送到 Fanout 类型的交换机,消息会广播到所有绑定的队列。

实现

  • 不需要 Routing Key。
  • 所有绑定到 Fanout 交换机的队列都会接收消息。

代码示例

rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");

路由模型

定义:生产者将消息发送到 Direct 类型的交换机,根据 Routing Key 精确匹配队列。

实现

  • 队列通过 Binding 绑定到交换机时,指定 Routing Key。
  • 消息的 Routing Key 必须与 Binding 的 Routing Key 一致。

代码示例

rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");

2. 环境准备

2.1 安装与配置 RabbitMQ

下载 Docker

  • 访问 Docker 官方网站:Docker: Accelerated Container Application Development。

  • 根据您的操作系统(Windows、macOS 或 Linux)下载并安装 Docker Desktop。

启动 Docker

  • 安装完成后,启动 Docker Desktop。

  • 确保 Docker 正在运行(任务栏或菜单栏中可以看到 Docker 图标)。

使用 Docker 快速部署 RabbitMQ

Docker 是部署 RabbitMQ 的最简单方式。通过以下命令,您可以快速启动一个 RabbitMQ 容器:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

参数说明

  • -d:以后台模式运行容器。

  • --name rabbitmq:为容器指定名称(rabbitmq)。

  • -p 5672:5672:将容器的 5672 端口映射到主机的 5672 端口(RabbitMQ 的消息通信端口)。

  • -p 15672:15672:将容器的 15672 端口映射到主机的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。

  • rabbitmq:management:使用带有管理插件的 RabbitMQ 镜像。

验证 RabbitMQ 是否运行

运行以下命令,查看容器是否正常运行:

docker ps

如果看到 rabbitmq 容器正在运行,说明 RabbitMQ 已成功启动。


2.2 使用 RabbitMQ 管理插件

RabbitMQ 提供了一个 Web 管理界面,方便您监控和管理 RabbitMQ。

访问管理界面

  1. 打开浏览器,访问 http://localhost:15672

  2. 使用默认用户名和密码登录:

    • 用户名:guest

    • 密码:guest

管理界面功能

  • Overview:查看 RabbitMQ 的整体状态,如连接数、队列数、消息速率等。

  • Connections:查看当前连接到 RabbitMQ 的客户端。

  • Channels:查看当前打开的通道。

  • Exchanges:查看和管理 Exchange。

  • Queues:查看和管理 Queue。

  • Admin:管理用户和权限。

2.3 用户与权限配置

默认情况下,RabbitMQ 只有一个用户 guest,密码也是 guest。为了安全性和权限管理,建议创建新用户并分配权限。

1. 创建新用户

在 RabbitMQ 管理界面中:

  • 点击顶部导航栏的 Admin

  • 在用户列表下方,点击 Add a user

  • 输入用户名和密码,例如:

    • 用户名:admin

    • 密码:admin123

  • 点击 Add user 完成创建。

2. 分配权限

  • 在用户列表中,找到刚创建的用户(如 admin)。

  • 点击用户右侧的 Set permission

  • 在权限设置页面:

    • Virtual Host:选择 /(默认的虚拟主机)。

    • Configure:输入 .*,表示允许用户配置所有资源。

    • Write:输入 .*,表示允许用户写入所有资源。

    • Read:输入 .*,表示允许用户读取所有资源。

  • 点击 Set permission 完成权限分配。

3. 使用新用户登录

  • 退出当前用户(点击右上角的 guest,选择 Log out)。

  • 使用新用户(如 admin)登录。

2.4  Spring Boot 中引入 RabbitMQ 依赖 

在 pom.xml 中添加以下依赖:

<dependencies>
    <!-- RabbitMQ 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依赖,它包含了以下内容:

  • RabbitMQ 客户端库

    • 自动引入 RabbitMQ 的 Java 客户端库(amqp-client),用于与 RabbitMQ 服务器通信。

  • Spring AMQP 支持

    • 提供了 Spring 对 AMQP(Advanced Message Queuing Protocol)的支持,包括 RabbitTemplate@RabbitListener 等。


2.5 Spring Boot 配置 RabbitMQ

在 Spring Boot 项目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的连接信息。

示例配置

# RabbitMQ 连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin123

配置说明

  • spring.rabbitmq.host:RabbitMQ 服务器地址(默认 localhost)。

  • spring.rabbitmq.port:RabbitMQ 消息通信端口(默认 5672)。

  • spring.rabbitmq.username:RabbitMQ 用户名。

  • spring.rabbitmq.password:RabbitMQ 密码。


3. Spring Boot 集成 RabbitMQ 的消息生产和消费

3.1 消息生产者(Producer)

在 Spring Boot 中,我们使用 RabbitTemplate 来发送消息。它由 spring-boot-starter-amqp 自动配置成为一个 Bean,可直接通过 @Autowired 注入。

如果 message 不是 String 类型的处理

  • Spring AMQP(spring-boot-starter-amqp)在使用 RabbitTemplate 时,默认的消息转换器(MessageConverter)通常会将对象序列化为 JSON 或者将字符串消息转换为字节。
  • 如果你的业务数据不是 String,常见做法是:
    1. 在发送时把非字符串对象序列化(如转换为 JSON 字符串);
    2. 或者配置自定义的 MessageConverter,让 Spring 帮你把对象自动序列化/反序列化。

典型做法:手动序列化为 JSON 再发送

@Service
public class CustomObjectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCustomObject(String queueName, MyCustomObject obj) {
        // 1. 将自定义对象序列化为 JSON 字符串
        String jsonString = new Gson().toJson(obj);

        // 2. 发送 JSON 字符串到 RabbitMQ
        rabbitTemplate.convertAndSend(queueName, jsonString);
    }
}
  • 在消费者端,你也可以将消息(JSON 字符串)反序列化为 MyCustomObject

配置自定义 Converter(可选)

  • Spring AMQP 提供了 Jackson2JsonMessageConverter 等现成转换器。
@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    // 配置 RabbitTemplate 使用该转换器
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}
  • 这样一来,rabbitTemplate.convertAndSend(queueName, myObject) 会自动把 myObject 转成 JSON 发送;消费者端则自动解析为同样的 Java 对象。

1)基本消息发送

场景
将消息直接发送到指定的队列,跳过交换机的路由,让 RabbitMQ 把消息放到这个队列中。

核心代码示例

@Service
public class BasicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;  // 1.自动注入的 RabbitTemplate

    /**
     * 2.发送基本消息到指定的队列
     * @param queueName  目标队列名称
     * @param message    消息内容
     */
    public void sendToQueue(String queueName, String message) {
        // 3.调用 convertAndSend,直接将消息放入指定队列
        rabbitTemplate.convertAndSend(queueName, message);
        System.out.println("Message sent to queue: " + queueName + ", content: " + message);
    }
}

代码详解

  • @Autowired private RabbitTemplate rabbitTemplate;`

    • Spring Boot 自动为我们配置了 RabbitTemplate,不用手动定义 Bean。

    • 通过依赖注入即可使用所有与 RabbitMQ 交互的方法。

  • public void sendToQueue(String queueName, String message)

    • 方法参数包括:

      • queueName: 目标队列的名称。

      • message: 要发送的字符串类型消息内容。

  • rabbitTemplate.convertAndSend(queueName, message)

    • convertAndSend 方法会将消息转换(转换为字节)并发送到指定队列。

    • 如果该队列不存在,RabbitMQ 会尝试自动创建(前提是 Broker 端配置允许自动创建队列)。


2)发送到交换机

场景
将消息发送到一个交换机(Exchange),再由交换机通过 Routing Key 将消息路由到匹配的队列中。

核心代码示例

@Service
public class ExchangeProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到指定交换机
     * @param exchangeName  交换机名称
     * @param routingKey    路由键
     * @param message       消息内容
     */
    public void sendToExchange(String exchangeName, String routingKey, String message) {
        // 将消息发送到 exchangeName 指定的交换机,使用 routingKey 进行路由
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
        System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
    }
}

代码详解

  • exchangeName

    • 要发送到的交换机名称,例如 "direct-exchange""fanout-exchange" 等。
  • routingKey

    • 路由键,用来匹配绑定(Binding)。
    • 例如:对 DirectExchange 而言,需要队列绑定时的路由键与发送时的路由键相同,消息才能到达队列。
  • rabbitTemplate.convertAndSend(exchangeName, routingKey, message)

    • 将消息先发送到交换机,再根据路由键将消息投递到目标队列。

3)发送带消息属性的消息

场景
需要为消息设置 TTL(过期时间)或优先级等属性,控制消息在队列中的行为。

核心代码示例

@Service
public class PropertyProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送带消息属性的消息(如 TTL, 优先级)
     */
    public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
        // 1.创建 MessageProperties 对象,用于指定消息的属性
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("10000"); // 过期时间:10秒 (单位:毫秒)
        properties.setPriority(5);        // 优先级设为 5

        // 2.根据消息体和属性构建 Message 对象
        Message message = new Message(messageContent.getBytes(), properties);

        // 3.使用 send 方法(而非 convertAndSend)直接发送 Message 对象
        rabbitTemplate.send(exchange, routingKey, message);

        System.out.println("Message with properties sent: " + messageContent);
    }
}

代码详解

  • MessageProperties properties = new MessageProperties();

    • MessageProperties 用于设置 AMQP 协议层的各种消息头信息。
  • properties.setExpiration("10000");

    • setExpiration 设置消息的 TTL(Time-To-Live),单位是毫秒。
    • 如果到达时间后消息仍未被消费,RabbitMQ 会将其从队列中移除并送入死信队列(如果配置了死信队列)。
  • properties.setPriority(5);

    • 设置消息的优先级为 5,前提是队列本身需要支持优先级队列(创建队列时指定 x-max-priority)。
  • new Message(messageContent.getBytes(), properties)

    • 将纯文本消息转换为 Message 对象,结合了消息属性和消息体。
  • rabbitTemplate.send(exchange, routingKey, message);

    • convertAndSend 不同,它不会尝试进行消息转换(如 JSON、字符串),而是直接发送完整的 AMQP Message 对象。

Message 构造函数 

public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
}
  • body:消息体的字节数组。
  • messageProperties:AMQP 的消息属性,包括 TTL、优先级、headers 等。、

如果消息体不是String类型

  • 手动转换为字节:你可以先将自定义对象转换为字节数组(例如通过 JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一个参数。
MyCustomObject obj = new MyCustomObject();
// 假设你想用 JSON
String jsonString = new Gson().toJson(obj);
byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);

MessageProperties properties = new MessageProperties();
// 设置一些属性
Message message = new Message(body, properties);
  • 为什么不会自动转 JSON?使用 new Message(...) 构造方法是“纯” AMQP 层的做法,不会调用 Spring 的转换器,因此你必须自己处理序列化。
  • 使用 Message 构造函数 时,你必须自行处理对象到 byte[] 的转换(无论是字符串、JSON,还是其他格式)。
  • 如果想让 Spring AMQP 自动转换,你通常使用 rabbitTemplate.convertAndSend(Object msg) 这种高级 API,或者配置自定义 MessageConverter

3.2 消息消费者(Consumer)

消费者的核心功能是在指定的队列中监听消息,并根据配置的确认模式(自动确认或手动确认)对消息进行处理或拒绝。

1)监听队列并消费消息

核心代码示例(自动确认模式)

@Service
public class Consumer {

    /**
     * 使用注解 @RabbitListener 指定要监听的队列
     * 由于默认为 auto-ack 模式,
     * 当消息到达后,RabbitMQ 会自动确认并从队列中删除该消息。
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        // 1.从 queueName 队列中取到的消息内容
        System.out.println("Received message: " + message);
        // 2.在 auto-ack 模式下,无需手动 ack
        //  如果这里出现异常,RabbitMQ 不会再次发送消息给消费者,消息会丢失。
    }
}

代码详解(自动确认模式)

  • @RabbitListener(queues = "demo-queue")

    • 声明监听名为 demo-queue 的队列。
    • 一旦有新消息到达该队列,就会自动回调此方法。
  • public void receiveMessage(String message)

    • 默认参数类型为字符串,当 RabbitMQ 收到消息后会尝试将其转换为 String 并注入到 message 中。
  • 自动确认(auto-ack)的风险

    • 如果消费者在处理消息时抛出异常,消息已经被 RabbitMQ 标记为“已确认”,不会再重新发送或进入死信队列,导致消息丢失。

2)确认机制

自动确认(auto-ack)
  • 行为

    • 当消费者从队列中获取消息后,RabbitMQ 会立即将该消息标记为已确认(acknowledged),并从队列中删除。

  • 问题

    • 如果消息处理失败(例如消费者抛出异常),消息已经被确认并从队列中删除,无法重新处理。

    • 如果消费者崩溃或断开连接,未处理的消息会丢失。

  • 适用场景

    • 对消息处理的可靠性要求不高的场景。


手动确认(manual-ack)

  • 行为

    • 消费者处理完消息后,必须显式调用 basicAck 方法确认消息。

    • 如果消息处理失败,可以调用 basicNack 或 basicReject 方法拒绝消息。

  • 优点

    • 确保消息处理的可靠性。

    • 支持消息重新入队或发送到死信队列。

  • 适用场景

    • 对消息处理的可靠性要求较高的场景。

核心代码示例:

@Service
public class ManualAckConsumer {

    /**
     * 在 application.properties 中配置:
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * 使得 RabbitMQ 使用手动确认模式
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            // 1.从消息中获取消息体
            String body = new String(message.getBody());
            System.out.println("Processing message: " + body);

            // 2.如果业务处理成功,则调用 basicAck 手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {
            System.err.println("Message processing failed: " + e.getMessage());
            
            // 3.如果处理失败,需要决定是重新入队还是拒绝并进入死信队列
            // requeue = true  -> 重新入队
            // requeue = false -> 丢弃或进入死信队列(根据队列配置)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

代码详解

  • 配置手动确认

    • application.properties 添加
      spring.rabbitmq.listener.simple.acknowledge-mode=manual
      
    • 表示 Spring AMQP 使用手动确认模式(manual-ack)。
  • public void receiveMessage(Message message, Channel channel)

    • 与自动确认不同,这里不仅接收字符串,还接收了 org.springframework.amqp.core.Message 对象和 com.rabbitmq.client.Channel
    • Message:包含消息体(body)和消息属性(headers 等)。
    • Channel:给我们提供了 basicAck, basicNack, basicReject 等底层 AMQP 操作。
  • 手动确认成功

    • channel.basicAck(deliveryTag, multiple)
      • deliveryTag:本次消息的唯一标记,从 message.getMessageProperties().getDeliveryTag() 获取。
      • multiple = false:只确认当前这条消息。

basicAck(long deliveryTag, boolean multiple)

  • 这里的 deliveryTag 并不是在你构造 Message 时生成的,而是 RabbitMQ Broker 在投递消息给消费者时由底层 AMQP 协议自动分配的一个递增的序号
  • long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
  • 手动确认失败

    • channel.basicNack(deliveryTag, multiple, requeue)basicReject
      • requeue = true:将消息重新放回队列等待下一次消费(可能导致死循环,如处理一直失败)。
      • requeue = false:拒绝消息,若配置了死信队列,则进入死信队列;否则丢弃消息。

3)处理消费失败

自动确认模式下的处理

  • 在自动确认模式下,如果消息处理失败,RabbitMQ 不会重新发送消息,因为消息已经被确认并从队列中删除。

  • 问题

    • 消息丢失,无法重新处理。

手动确认模式下的处理

  • 在手动确认模式下,如果消息处理失败,可以通过以下方式处理:
  • 重新入队

    • 调用 basicNack 或 basicReject 方法,并将 requeue 参数设置为 true

    • 消息会重新进入队列,等待下一次消费。

  • 发送到死信队列

    • 调用 basicNack 或 basicReject 方法,并将 requeue 参数设置为 false

    • 如果队列配置了死信队列,消息会被发送到死信队列。

重试机制(Spring AMQP 提供的简单重试)(只支持手动确认机制)

是重试失败了才会将消息重新入队 ,所以重试在前,重新入队在后

# 启用重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 间隔倍数
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重试间隔
spring.rabbitmq.listener.simple.retry.max-interval=10000
  • Spring AMQP 提供了 重试机制,可以在消费者处理消息失败时,自动进行多次重试,而不是直接将消息重新入队。

行为

  • 当消息处理失败时,Spring AMQP 会在 本地 进行重试(即不将消息重新入队),直到达到最大重试次数。

  • 如果重试次数用尽,消息会被拒绝(basicNack 或 basicReject),并根据配置决定是否重新入队或发送到死信队列。

死信队列(DLQ)

  • 当消息被拒绝或过期时,RabbitMQ 会将其发送到我们配置的死信交换机(DLX),再路由到死信队列(DLQ)。

  • 配置示例

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public Queue normalQueue() {
            return QueueBuilder.durable("normal-queue")
                    .withArgument("x-dead-letter-exchange", "dead-letter-exchange")  // 指定死信交换机
                    .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由键
                    .build();
        }
    
        @Bean
        public DirectExchange deadLetterExchange() {
            return new DirectExchange("dead-letter-exchange");
        }
    
        @Bean
        public Queue deadLetterQueue() {
            return new Queue("dead-letter-queue");
        }
    
        @Bean
        public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
            return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
        }
    }
    
  • 原理

    • 正常队列通过 x-dead-letter-exchange 指定死信交换机,一旦消息被拒绝(requeue=false)或超时(TTL 到期),RabbitMQ 会把消息发送到 dead-letter-exchange
    • dead-letter-exchangedead-letter-queue 进行绑定(路由键 dead-letter-routing-key),从而实现死信队列的存储。
  • 重新入队 vs 发送到死信队列

    • 重新入队channel.basicNack(deliveryTag, false, true)
      • 适用于临时性错误,比如数据库锁冲突、网络抖动等,等待后续重新处理。
    • 发送到死信队列channel.basicNack(deliveryTag, false, false)
      • 适用于永久性错误,比如消息格式无法解析,或业务逻辑指定不应再尝试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278623.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2025.1.15——四、布尔注入

题目来源&#xff1a;ctfhub技能树 目录 一、基本操作&#xff1a;整理已知信息&#xff0c;得到本题为布尔注入 方法一&#xff1a;手工盲注&#xff08;不推荐&#xff09; step 1&#xff1a;判断具体形式 step 2&#xff1a;查询字段数 step 3&#xff1a;通过回显判…

PE文件:节表-添加节

在所有节的空白区域都不够存放我们想要添加的数据时&#xff0c;这个时候可以通过添加节来扩展我们可操作的空间去存储新的数据&#xff08;如导入表、代码或资源&#xff09;。 过程步骤 1.判断是否有足够的空间添加节表 PE文件的节表紧跟在PE头之后&#xff0c;每个节表的…

【前端动效】HTML + CSS 实现打字机效果

目录 1. 效果展示 2. 思路分析 2.1 难点 2.2 实现思路 3. 代码实现 3.1 html部分 3.2 css部分 3.3 完整代码 4. 总结 1. 效果展示 如图所示&#xff0c;这次带来的是一个有趣的“擦除”效果&#xff0c;也可以叫做打字机效果&#xff0c;其中一段文本从左到右逐渐从…

Python基于Django的图像去雾算法研究和系统实现(附源码,文档说明)

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

了解 BM25:一种高效的文本检索算法

什么是 BM25&#xff1f; BM25&#xff08;Best Matching 25&#xff09;是一种在信息检索领域非常著名的算法&#xff0c;它属于 TF-IDF 的改进版本&#xff0c;是许多现代搜索引擎和文本检索系统的核心算法之一。BM25 基于概率检索模型&#xff08;Probabilistic Informatio…

PenGymy论文阅读

这里发现idea被人家先发了&#xff0c;没办法&#xff0c;资料收集的不够全面&#xff0c;现在来学习一下这个项目 这篇论文的贡献如下&#xff1a; 总的来说&#xff0c;他的主要工作是构建逼真的仿真环境&#xff0c;然后根据这个仿真环境生成真实的靶场&#xff0c;使得这个…

猫贫血吃什么能快速补血?

各位铲屎官们&#xff0c;看到自家猫咪无精打采、小脸苍白&#xff0c;是不是特别心疼&#xff1f;贫血可是猫咪健康的大敌&#xff0c;今天就来给大家支支招&#xff0c;哪些食物和方法能让猫咪快速补血&#xff0c;恢复活力&#xff01; 一、红肉及内脏类 红肉是补血的“主力…

Redis 性能优化:多维度技术解析与实战策略

文章目录 1 基准性能2 使用 slowlog 优化耗时命令3 big key 优化4 使用 lazy free 特性5 缩短键值对的存储长度6 设置键值的过期时间7 禁用耗时长的查询命令8 使用 Pipeline 批量操作数据9 避免大量数据同时失效10 客户端使用优化11 限制 Redis 内存大小12 使用物理机而非虚拟机…

wireshark抓路由器上的包 抓包路由器数据

文字目录 抓包流程概述设置抓包配置选项 设置信道设置无线数据包加密信息设置MAC地址过滤器 抓取联网过程 抓包流程概述 使用Omnipeek软件分析网络数据包的流程大概可以分为以下几个步骤&#xff1a; 扫描路由器信息&#xff0c;确定抓包信道&#xff1b;设置连接路由器的…

在 Fluent 网格划分中使用薄网格特征

薄体模型的网格划分策略 薄体网格划分对于有效模拟薄壁结构或厚度明显小于其他尺寸的几何形状非常有利。当使用此类几何结构时&#xff0c;传统的体积网格划分技术可能会导致单元数量增加&#xff0c;因为它们试图捕获具有许多不必要单元的薄尺寸。薄体网格划分通过专门沿薄方…

大模型WebUI:Gradio全解11——Chatbot:融合大模型的多模态聊天机器人(6)

大模型WebUI&#xff1a;Gradio全解11——Chatbot&#xff1a;融合大模型的多模态聊天机器人&#xff08;6&#xff09; 前言本篇摘要11. Chatbot&#xff1a;融合大模型的多模态聊天机器人11.6 为LLM Agent构建UI11.5.1 使用代理构建1. 使用transformers.agents的实际示例2. 使…

Linux-----线程同步(资源竞争和同步锁)

目录 资源竞争&#xff08;背景&#xff09; 锁&#xff08;解决方式&#xff0c;实现同步&#xff09; 互斥锁 读写锁 自旋锁 资源竞争&#xff08;背景&#xff09; 竞态条件 当多个线程并发访问和修改同一个共享资源&#xff08;如全局变量&#xff09;时&#xff0c;…

vue2 web 多标签输入框 elinput是否当前焦点

又来分享一点点工作积累及解决方案 产品中需要用户输入一些文字后按下回车键生成标签来显示在页面上&#xff0c;经过尝试与改造完成如下&#xff1a; <template><div class"tags-view" click"beginInput"><el-tag :key"index" …

Python学习(十)IO编程(文件读写、StringIO和BytesIO、操作文件和目录、序列化)

目录 一、什么是IO编程&#xff1f;二、文件读写1&#xff09;读文件2&#xff09;file-like Object3&#xff09;二进制文件4&#xff09;字符编码5&#xff09;写文件 三、StringIO 和 BytesIO1&#xff09;StringIO2&#xff09;BytesIO 四、操作文件和目录1&#xff09;操作…

5、docker-compose和docker-harbor

安装部署docker-compose 自动编排工具&#xff0c;可以根据dockerfile自动化的部署docker容器。是yaml文件格式&#xff0c;注意缩进。 1、安装docker-compose 2、配置compose配置文件docker-compose.yml 3、运行docker-compose.yml -f&#xff1a;指定文件&#xff0c;up&…

JS宏进阶: 工厂函数与构造函数

一、构造函数 在JavaScript中&#xff0c;构造函数是一种用于创建和初始化对象的特殊函数。构造函数的名字通常以大写字母开头&#xff0c;以区分于普通函数。通过new关键字调用构造函数&#xff0c;可以创建一个新的实例对象&#xff0c;并自动执行构造函数内部的代码来初始化…

uniapp 微信小程序 editor 富文本编辑器

<view class"inp boxsizing"><view class"contentBox"><!-- 富文本编辑器 --><view classwrapper><view classtoolbar tap"format"><view :class"formats.bold ? ql-active : " class"iconfon…

Python根据图片生成学生excel成绩表

学习笔记&#xff1a; 上完整代码 import os import re from openpyxl import Workbook, load_workbook from openpyxl.drawing.image import Image as ExcelImage from PIL import Image as PilImage# 定义图片路径和Excel文件路径 image_dir ./resources/stupics # 图片所…

在VMwareFusion中使用Ubuntu

在VMwareFusion使用Ubuntu 在VMwareFusion使用Ubuntu背景在VMwareFusion虚拟机里使用Ubuntu1、集成桌面工具2、主机和虚拟机之间共享剪贴板内容3、设置root用户密码4、设置静态ip4.1、静态ip和动态ip的区别4.2、查看当前ip4.2、linux网络配置文件所在位置4.3、基于ubuntu22.04.…

农业农村大数据应用场景|珈和科技“数字乡村一张图”解决方案

近年来&#xff0c;珈和科技持续深耕农业领域&#xff0c;聚焦时空数据服务智慧农业。 珈和利用遥感大数据、云计算、移动互联网、物联网、人工智能等先进技术&#xff0c;搭建“天空地一体化”监测体系&#xff0c;并创新建设了150的全球领先算法模型&#xff0c;广泛应用于高…