深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念

news2024/12/28 18:22:03

文章目录

  • 文章导图
  • RabbitMQ架构及相关概念
    • 四大核心概念
    • 名词解读
  • 七大工作模式及四大交换机类型
    • 0、前置了解-默认交换机DirectExchange
    • 1、简单模式(Simple Queue)-默认DirectExchange
    • 2、 工作队列模式(Work Queues)-默认DirectExchange
    • 3、发布/订阅模式(Publish/Subscribe)-FanoutExchange
    • 4、路由模式(Routing)-自定义DirectExchange
    • 5、主题模式(Topics)-TopicExchange
    • 总结
  • 三种队列类型
    • 普通队列
    • 死信队列(Dead Letter Queue, DLQ)
      • 定义
      • 触发条件
      • 应用场景
      • 配置
    • 延迟队列(Delayed Queue)
      • 定义
      • 实现方式
      • 应用场景
    • 两者区别
    • 代码实战
      • 1. 延迟队列:TTL+DLX死信队列
        • 配置步骤
      • 2. 延迟队列:RabbitMQ延迟消息插件
        • 配置步骤
      • 3、死信队列: basic.reject或basic.nack
        • 1. 引入依赖
        • 2. 配置交换机、队列和死信队列
        • 3. 生产者发送消息
        • 4. 消费者监听并拒绝消息
        • 5. 消费者监听死信队列
        • 总结

RabbitMQ系列文章
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
TODO:RabbitMQ可靠性
TODO:RabbtiMQ顺序性
TODO:RabbitMQ常见问题整理

文章导图

在这里插入图片描述

RabbitMQ架构及相关概念

在这里插入图片描述

四大核心概念

生产者

产生数据发送消息的程序是生产者。

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定 。

队列

队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者

名词解读

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接 Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的
  • channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走 Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。
  • Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

七大工作模式及四大交换机类型

网上查了很多资料有的说是五种,有的说是四种,可以看到在RabbitMQ在官网提到的共有7种工作模式:https://www.rabbitmq.com/tutorials

第6种是RPC调用,这个我们正常肯定不用这个实现RPC,而第7种是confirm 确认模式,可以用于保证生产者消息发送的可靠性,这个我后面会再专门介绍。
现在我们主要讲前5种工作模式,实际上总结来说5种又可以总结为是3种,其实第1、2、4根据他们都是Direct交换机可以归结为一种,下文我会详细讲解一下。

在这里插入图片描述

0、前置了解-默认交换机DirectExchange

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。 默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

1、简单模式(Simple Queue)-默认DirectExchange

这个和别的几种模式对比看着没有X,这个其实用了默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:

在这里插入图片描述

//Config
@Bean
Queue queue1() {
    return new Queue("simpleQueue");
}

// 生产者
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendSimpleMessage(String message) {
    rabbitTemplate.convertAndSend("simpleQueue", message);
}

// 消费者
@RabbitListener(queues = "simpleQueue")
public void receiveSimpleMessage(String message) {
    System.out.println("Received: " + message);
}

这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “simpleQueue”,则 routingkey 为 “simpleQueue” 的消息会被该消息队列接收。
具体可以看下源码发送convertAndSend

/** Alias for amq.direct default exchange. */
private static final String DEFAULT_EXCHANGE = "";

private static final String DEFAULT_ROUTING_KEY = "";

private volatile String exchange = DEFAULT_EXCHANGE;
private volatile String routingKey = DEFAULT_ROUTING_KEY;

@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {
	//可以发现这个this.exchange就是DEFAULT_EXCHANGE = ""
    convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}

2、 工作队列模式(Work Queues)-默认DirectExchange

这种情况是这样的:

在这里插入图片描述

一个生产者,也是一个默认的交换机(DirectExchange),一个队列,两个消费者。
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

和第一种对比主要体现在有多个消费者进行消费,主要优势在于可以通过增加消费者来提高处理能力。

//Config
@Bean
Queue queue1() {
    return new Queue("workQueue");
}

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendWorkMessage(String message) {
    rabbitTemplate.convertAndSend("workQueue", message);
}

// Consumer
@RabbitListener(queues = "workQueue")
public void receiveWorkMessage(String message) {
    System.out.println("Received: " + message);
    // Simulate work
    Thread.sleep(1000);
}

3、发布/订阅模式(Publish/Subscribe)-FanoutExchange

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

在这里插入图片描述

在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。

//Config
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
}

@Bean
public Queue fanoutQueue1() {
    return new Queue("fanoutQueue1");
}

@Bean
public Queue fanoutQueue2() {
    return new Queue("fanoutQueue2");
}

@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

接下来创建两个消费者,两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendFanoutMessage(String message) {
    rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}

// Consumer
@RabbitListener(queues = "fanoutQueue1")
public void receiveFanoutMessage1(String message) {
    System.out.println("Queue1 Received: " + message);
}

@RabbitListener(queues = "fanoutQueue2")
public void receiveFanoutMessage2(String message) {
    System.out.println("Queue2 Received: " + message);
}

注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null

4、路由模式(Routing)-自定义DirectExchange

DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “directQueue1”,则 routingkey 为 “directQueue1” 的消息会被该消息队列接收。

在这里插入图片描述

// Config
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("directExchange");
}

@Bean
public Queue directQueue1() {
    return new Queue("directQueue1");
}

@Bean
public Queue directQueue2() {
    return new Queue("directQueue2");
}

@Bean
public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {
    return BindingBuilder.bind(directQueue1).to(directExchange).with("info");
}

@Bean
public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {
    return BindingBuilder.bind(directQueue2).to(directExchange).with("error");
}

可以发现我们可以根据routingKey控制发送到哪个队列上,这个本质上和我们前面2种模式都是一样的,采用的都是DirectExchange,只不过前面2种的交换机DirectExchange是""默认值,现在我们这里是指定了自己的DirectExchange

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDirectMessage(String message, String routingKey) {
    rabbitTemplate.convertAndSend("directExchange", routingKey, message);
}

// Consumer
@RabbitListener(queues = "directQueue1")
public void receiveDirectMessage1(String message) {
    System.out.println("Queue1 Received: " + message);
}

@RabbitListener(queues = "directQueue2")
public void receiveDirectMessage2(String message) {
    System.out.println("Queue2 Received: " + message);
}

特别注意:如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

5、主题模式(Topics)-TopicExchange

在这里插入图片描述

在 RabbitMQ 的主题模式(Topics)中,消息通过带有路由键的主题交换机(TopicExchange)进行路由。消息的路由键是一个点分隔的字符串,消费者可以使用绑定键(带有通配符)来订阅感兴趣的消息。

  • 队列 topicQueue1 使用绑定键 *.orange.*,匹配任意第一个和第三个单词,以 orange 为第二个单词的消息。
  • 队列 topicQueue2 使用绑定键 *.*.rabbit,匹配任意前两个单词,以 rabbit 为第三个单词的消息。
// Config
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
}

@Bean
public Queue topicQueue1() {
    return new Queue("topicQueue1");
}

@Bean
public Queue topicQueue2() {
    return new Queue("topicQueue2");
}

@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
    return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
}

@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
    return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
}

  • topicQueue1topicQueue2 接收匹配其绑定键的消息。
  • 灵活路由: 主题模式允许通过复杂的路由键实现灵活的消息路由。
  • 使用场景: 适用于需要按模式匹配路由消息的场景,比如日志分级、区域性数据分发等。
// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendTopicMessage(String message, String routingKey) {
    rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
}

// Consumer
@RabbitListener(queues = "topicQueue1")
public void receiveTopicMessage1(String message) {
    System.out.println("Queue1 Received: " + message);
}

@RabbitListener(queues = "topicQueue2")
public void receiveTopicMessage2(String message) {
    System.out.println("Queue2 Received: " + message);
}

总结

看了上面的5个例子,其实本质上我们可以根据Exchange交换机类型归结为3种工作模式Direct、Fanout、Topic

  • Direct:定向,把消息交给符合指定routing key 的队列 (第1、2、4其实都是这种交换机)
  • Fanout:广播,将消息交给所有绑定到交换机的队列 第**(第3种模式)**
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列**(第5种模式)**

这里提一下,交换机还有一种类型,Headers:头匹配,基于MQ的消息头匹配,不过这种用的非常少,可以忽略!

不难发现,这三种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,三种类别对应着三种判断角度

  • direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;
  • fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;
  • topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

三种队列类型

普通队列

我们平常发送的正常都是普通队列,比如上面5种工作模式说的都是普通队列,就不多说了

死信队列(Dead Letter Queue, DLQ)

特别注意:

  • 队列和消息都有个TTL生存时间,队列的TTL到达后队列会自动删除,消息不会进入死信队列;
  • 消息的生存时间到达后会进入死信队列。消息的生存时间可以在队列设置所有消息的TTL,也可以对某个消息单独设置TTL。

定义

死信队列是用于处理无法被消费者正确处理的消息的队列。当消息在原始队列中无法被消费时,会被转移到死信队列中。

触发条件

消息会变成死信并进入死信队列的几种情况:

  1. 消息被消费者拒绝(通过basic.rejectbasic.nack),并且requeue=false
  2. 消息在队列中超过了TTL(Time To Live)时间。
  3. 队列达到最大长度限制,无法再接收新消息。

在这里插入图片描述

应用场景

  • 处理无法被消费的消息,避免消息堆积影响其他消息的消费。
  • 记录和监控消息处理错误,方便进行后续处理

配置

  • 通过设置 x-dead-letter-exchangex-dead-letter-routing-key 将消息路由到死信队列。
  • 在原始队列中设置死信交换机和死信队列的相关参数

延迟队列(Delayed Queue)

定义

延迟队列是一种特殊的队列,消息在发送到队列后,需要等待一段时间后才能被消费。

实现方式

  1. 通过死信队列实现延迟任务

    把死信队列就当成延迟队列,具体来说是这样:

    假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

    • 将消息发送到一个没有消费者的队列,设置消息的TTL。
    • 消息过期后进入死信队列,再由死信队列的消费者处理。
  2. 通过RabbitMQ延迟插件

    • 使用RabbitMQ的延迟插件(rabbitmq_delayed_message_exchange 插件),消息在延迟一段时间后再投递到目标队列中进行消费。

应用场景

  • 订单超时未支付自动取消。
  • 用户注册后未登录的提醒。
  • 预定会议前的通知

两者区别

使用TTL和死信队列实现延迟插件其实是会有一些问题的:

  • 问题一:当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, 必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;
  • 问题二:就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;

延迟交换机插件可以在一定程度上解决上述两种问题。

特性死信队列延迟队列
定义处理无法被消费的消息消息在指定时间后才被消费
触发条件消息被拒绝、消息过期、队列满消息设置了TTL或使用延迟插件
应用场景处理消费失败的消息,避免队列堵塞订单超时取消、提醒通知等延迟处理场景
实现方式配置死信交换机和死信队列使用TTL和死信队列或延迟插件
消息处理进入死信队列后进行特殊处理延迟一段时间后再投递到目标队列

代码实战

1. 延迟队列:TTL+DLX死信队列

配置步骤

1、引入依赖

pom.xml中引入Spring Boot和RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置交换机和队列

在Spring Boot的配置类中,配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    // 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal_exchange");
    }

    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead_letter_exchange");
    }

    // 普通队列并绑定到普通交换机
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal_queue")
                .withArgument("x-dead-letter-exchange", "dead_letter_exchange")
                .withArgument("x-dead-letter-routing-key", "dead_letter_routing_key")
                .build();
    }

    @Bean
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");
    }

    // 死信队列并绑定到死信交换机
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead_letter_queue");
    }

    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");
    }
}

3、生产者发送消息

在生产者发送消息时,可以指定消息的TTL(Time-To-Live)。TTL到期后,消息会被转发到死信队列:

  • 创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间
java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(@RequestParam String message, @RequestParam int delay) {
        rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message, msg -> {
			//创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间
            msg.getMessageProperties().setExpiration(String.valueOf(delay));
            return msg;
        });
        return "Message sent with delay: " + delay;
    }
}

4、消费者监听死信队列

消费者监听死信队列,接收到消息后处理:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(queues = "dead_letter_queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

2. 延迟队列:RabbitMQ延迟消息插件

RabbitMQ有一个插件 rabbitmq-delayed-message-exchange 可以直接支持延迟消息队列。

配置步骤

1、安装RabbitMQ延迟消息插件

首先,确保RabbitMQ服务器上已安装rabbitmq-delayed-message-exchange插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange/21、**配置交换机和队列**

2、在Spring Boot中配置使用延迟消息交换机:

  • 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类,用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。
  • 交换机的名称为DELAYED_EXCHANGE,类型为"x-delayed-message",持久化为true,自动删除为false,属性为之前创建的HashMap对象。
java复制代码import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitDelayedConfig {

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, Map.of("x-delayed-type", "direct"));
    }

    @Bean
    public Queue delayedQueue() {
        return new Queue("delayed_queue");
    }

    @Bean
    public Binding delayedBinding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("delayed_routing_key").noargs();
    }
}

3、生产者发送消息

生产者在发送消息时,可以设置延迟时间:

java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
public class DelayedProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDelayed")
    public String sendDelayed(@RequestParam String message, @RequestParam int delay) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-delay", delay);
        rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", message, msg -> {
            msg.getMessageProperties().getHeaders().putAll(headers);
            return msg;
        });
        return "Delayed message sent with delay: " + delay;
    }
}

4、消费者监听延迟队列

与TTL+DLX方法相同,消费者直接监听队列接收消息:

java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DelayedConsumer {

    @RabbitListener(queues = "delayed_queue")
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

3、死信队列: basic.reject或basic.nack

死信队列有3种情况: 这里就举常见的手动ack的情况拒绝消息实现死信队列

要在Spring Boot中使用RabbitMQ实现死信队列(Dead Letter Queue,DLQ),并处理消息被消费者拒绝的情况(通过basic.rejectbasic.nack并且requeue=false),可以按照以下步骤来实现。

1. 引入依赖

首先,在pom.xml中引入Spring Boot和RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置交换机、队列和死信队列

接下来,在Spring Boot的配置类中配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    // 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal_exchange");
    }

    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead_letter_exchange");
    }

    // 普通队列并绑定到普通交换机
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal_queue")
                .withArgument("x-dead-letter-exchange", "dead_letter_exchange") // 设置死信交换机
                .withArgument("x-dead-letter-routing-key", "dead_letter_routing_key") // 设置死信RoutingKey
                .build();
    }

    @Bean
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");
    }

    // 死信队列并绑定到死信交换机
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead_letter_queue");
    }

    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");
    }
}
3. 生产者发送消息

在生产者中发送消息到普通队列:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(@RequestParam String message) {
        rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message);
        return "Message sent: " + message;
    }
}
4. 消费者监听并拒绝消息

注意这里的前提是要开启手动ack:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack

消费者监听普通队列并有条件地拒绝消息,将消息转发到死信队列:

  • 当发送的消息内容为"reject"时,该消息会被拒绝并转发到死信队列。
  • 当发送其他内容的消息时,消息会被正常消费。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;

@Component
public class Consumer {

    @RabbitListener(queues = "normal_queue")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            String msg = new String(message.getBody());
            System.out.println("Received message: " + msg);

            // 根据某些条件判断是否拒绝消息
            if ("reject".equals(msg)) {
                // 拒绝消息,并且不重新入队(requeue=false)
                channel.basicReject(tag, false);
                System.out.println("Message rejected: " + msg);
            } else {
                // 消费成功,确认消息
                channel.basicAck(tag, false);
            }
        } catch (Exception e) {
            // 异常情况也可以使用basicNack将消息拒绝,并且不重新入队
            channel.basicNack(tag, false, false);
        }
    }
}
5. 消费者监听死信队列

最后,消费者监听死信队列,处理被拒绝的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DeadLetterConsumer {

    @RabbitListener(queues = "dead_letter_queue")
    public void receiveDeadLetterMessage(String message) {
        System.out.println("Received dead letter message: " + message);
    }
}
总结
  • 配置普通队列和死信队列,并通过设置x-dead-letter-exchangex-dead-letter-routing-key来实现消息被拒绝后的处理。
  • 消费者可以根据业务逻辑通过basic.rejectbasic.nack拒绝消息,并指定不重新入队(requeue=false),从而将消息转发到死信队列。
  • 死信队列中的消息可以被另一个消费者监听和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2114072.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

医院检验系统LIS源码,LIS系统的定义、功能结构以及样本管理的操作流程

本文将对医院检验系统LIS进行介绍&#xff0c;包括LIS系统的定义、功能结构以及样本管理的操作流程方面。 LIS系统定义 LIS系统&#xff08;Laboratory Information System&#xff09;是一种专门为临床检验实验室开发的信息管理系统&#xff0c;其主要功能包括实验室信息管理…

攻防世界 supersqli

supersqli 一般sql语句的题都是先判断&#xff0c;经过测试&#xff0c;是单引号注入 999 union select database(),2#可以发现很多关键字都被过滤了select&#xff0c;所以联合查询&#xff0c;报错注入&#xff0c;布尔和时间盲注都不能用了&#xff0c;可以想到堆叠注入。…

【重学 MySQL】十六、算术运算符的使用

【重学 MySQL】十六、算术运算符的使用 加法 ()减法 (-)乘法 (*)除法 (/ 或 div )取模&#xff08;求余数&#xff09; (% 或 mod )注意事项 在 MySQL 中&#xff0c;算术运算符用于执行数学运算&#xff0c;如加法、减法、乘法、除法和取模&#xff08;求余数&#xff09;等。…

数字逻辑设计基础

参考&#xff1a; 正点原子逻辑设计指南 状态机FSM 目录 一、组合逻辑组合逻辑中的竞争-冒险现象组合逻辑毛刺消除 二、时序逻辑锁存器触发器寄存器计数器 三、边沿检测电路四、格雷码转换电路五、复位电路数字逻辑需要复位的原因同步复位异步复位异步复位、同步释放 六、状…

C++---基础概念

1 命名空间 在C/C中&#xff0c;变量、函数和后面要学到的类都是大量存在的&#xff0c;这些变量、函数和类的名称将都存 在于全局作用域中&#xff0c;可能会导致很多冲突。使用命名空间的目的是对标识符的名称进行本地化&#xff0c; 以避免命名冲突或名字污染&#xff0c;n…

Redis - 缓存

文章目录 目录 文章目录 1. 什么是缓存&#xff1f; 2. 使用Redis作为缓存 2.1 关系型数据库的缺点 3. 缓存的更新策略 3.1 定期生成 3.2 实时生成 缓存淘汰策略 4. 缓存预热, 缓存穿透, 缓存雪崩 和 缓存击穿 缓存预热 缓存穿透 缓存雪崩 缓存击穿 总结 1. 什么…

Web安全之XSS跨站脚本攻击:如何预防及解决

1. 什么是XSS注入 XSS&#xff08;跨站脚本攻击&#xff0c;Cross-Site Scripting&#xff09;是一种常见的Web安全漏洞&#xff0c;通过注入恶意代码&#xff08;通常是JavaScript&#xff09;到目标网站的网页中&#xff0c;以此在用户浏览网页时执行。攻击者可以通过XSS获取…

特征值分解(EVD)和奇异值分解(SVD)—应用于图片压缩

特征值分解(EVD)和奇异值分解(SVD)—应用于图片压缩 目录 前言 一、特征值分解 二、应用特征值分解对图片进行压缩 三、矩阵的奇异值分解 四、应用奇异值分解对图片进行压缩 五、MATLAB仿真代码 前言 学习了特征值分解和奇异值分解相关知识&#xff0c;发现其可以用于图片…

初次使用住宅代理有哪些常见误区?

随着网络技术的发展&#xff0c;住宅代理因其高匿名性和稳定性成为许多用户进行网络活动的首选工具。然而&#xff0c;对于新手而言&#xff0c;使用住宅代理时往往容易陷入一些误区&#xff0c;这不仅可能影响使用效果&#xff0c;还可能带来安全风险。本文将探讨新手在使用住…

我与Linux的爱恋:yum和vim以及gcc的使用

​ ​ &#x1f525;个人主页&#xff1a;guoguoqiang. &#x1f525;专栏&#xff1a;Linux的学习 文章目录 ​1.Linux软件包管理器yum2.Linux开发工具3.Linux编译器 vimvim的基本概念vim的基本操作vim正常模式命令集vim末行模式命令集vim操作总结批量化注释批量化去注释简…

【数据库中级】1_DBeaver操作数据库

文章目录 一、连接数据库1.1 命令行连接数据库1.2 DBeaver工具连接数据库 二、DBeaver操作数据库2.1 通过DBeaver操作数据库2.2 通过DBeaver操作表2.3 通过DBeaver操作数据 三、DBeaver界面3.1 SQL编辑区3.2 导航区3.3 修改字体大小 一、连接数据库 1.1 命令行连接数据库 命令…

《Learning to Count without Annotations》CVPR2024

摘要 论文提出了一种名为UnCounTR的模型&#xff0c;该模型能够在没有任何手动标注的情况下学习进行基于参考的对象计数。这是通过构建“Self-Collages”&#xff08;自我拼贴画&#xff09;实现的&#xff0c;即在背景图像上粘贴不同对象的图像作为训练样本&#xff0c;提供覆…

【Git 学习笔记_24】Git 实用冷门操作技巧(四)—— 更多实用 git 别名设置、交互式新增提交

文章目录 11.8 更多别名设置别名1&#xff1a;只查看当前分支&#xff08;git b&#xff09;别名2&#xff1a;以图表形式显示自定义格式的 git 日志&#xff08;git graph&#xff09;别名3&#xff1a;查看由于合并分支导致的冲突后仍有冲突的、待合并的文件列表&#xff08;…

Excel文档的读取(1)

熟悉使用Excel的同学应该都知道&#xff0c;在单个Excel表格里想要分商品计算总销售额&#xff0c;使用数据透视表也可以非常快速方便的获得结果。但当有非常大量的Excel文件需要处理时&#xff0c;每一个Excel文件单独去做数据透视也会消耗大量的时间。就算使用Power Query这样…

python容器3——字典

&#xff08;1&#xff09; 什么是字典 字典&#xff1a;python中使用关键字dict表示 字典中允许通过key:value键值对的方式存储数据&#xff0c;让数据的管理更加友好&#xff01; 如图&#xff1a; 字典是一个哈希结构 (传入一个值算出内存地址&#xff0c;将该值保存在该…

产品经理就业

供需关系 1.需求分析核心价值是? 将真实的用户需求分析得到与之匹配的产品方案(功能) 2.Y模型的主要内容及其侧重点? 1)用户需求、2)目标动机、3)产品功能、4)人性(马斯洛需求) 1-2-4侧重深入想清楚需求本质 Why、4 -2-3 侧重浅出 How 结果输出 3.可以从哪些角度做好需求分析…

java.lang.IndexOutOfBoundsException: setSpan ( 0...x ) ends beyond length X

1&#xff0c;可能是EditText&#xff0c;setSelection(x)时超过了 输入框内容的实际长度导致的。 2&#xff0c;手机开启“拼写检查功能”&#xff0c;EditText设置了最大长度&#xff0c;选择提示的某一项文案时超过设置的最大长度限制&#xff0c;导致崩溃。 针对情况2 开…

【电子通识】洁净度等级划分及等级标准

洁净度常用于评估半导体、生物制药、医疗、实验室及科研院所、新能源等领域的洁净室、无尘室或者无菌室等环境。 一般来说&#xff0c;晶圆光刻、制造、测试等级为100级或1000级的洁净间&#xff0c;百级洁净间要求空气中0.5微米的尘埃粒子数不得超过每立方米3520个&#xff1b…

移动UI:新手指引页面,跟着指引不迷路。

移动端新手指引在提供用户引导、提升用户体验、提高用户留存率、促进功能使用和降低用户流失率方面都有积极的作用。 移动端新手指引在应用程序或移动网站中有以下几个作用&#xff1a; 1. 提供用户引导&#xff1a; 新手指引可以帮助用户快速了解应用程序或移动网站的功能和…

【网络安全】Exif 数据储存型XSS

未经许可,不得转载。 文章目录 Exif步骤Exif EXIF(Exchangeable Image File Format)数据是一种存储在图像文件中的元数据格式,常用于数码照片和扫描图像。它包含了与图像相关的各种信息,比如拍摄日期和时间、相机品牌和型号、拍摄时的设置(如曝光时间、光圈、ISO等)、地…