RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)

news2025/3/11 17:20:46

RabbitMQ高级特性

 RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客

RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客


引言

RabbitMQ 作为一款强大的消息队列中间件,在分布式系统中发挥着至关重要的作用。除了基本的消息收发功能外,它还具备许多高级特性,如 TTL、死信队列、延迟队列、事务和消息分发等。本文将详细介绍这些高级特性。


1. TTL(Time to Live,过期时间)

1.1 概念

TTL 即过期时间,RabbitMQ 可以对消息和队列设置 TTL。当消息到达存活时间之后,还没有被消费,就会被自动清除。这在很多业务场景中都非常有用,比如网上购物时,下单超过 24 小时未付款,订单会被自动取消;申请退款之后,超过 7 天未被处理,则自动退款。

1.2 设置消息的 TTL

有两种方法可以设置消息的 TTL,一是设置队列的 TTL,队列中所有消息都有相同的过期时间;二是对消息本身进行单独设置,每条消息的 TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。

1.2.1 针对每条消息设置 TTL

针对每条消息设置 TTL 的方法是在发送消息的方法中加入 expiration 的属性参数,单位为毫秒。

配置交换机和队列

// TTL
public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE_NAME = "ttl_exchange";

// 1. 交换机
@Bean("ttlExchange")
public Exchange ttlExchange() {
    return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();
}

// 2. 队列
@Bean("ttlQueue")
public Queue ttlQueue() {
    return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}

// 3. 队列和交换机绑定 Binding
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange, 
                          @Qualifier("ttlQueue") Queue queue) {
    return BindingBuilder.bind(queue).to(exchange);
}

发送消息

@RequestMapping("/ttl")
public String ttl() {
    String ttlTime = "10000"; // 10s
    rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
        return messagePostProcessor;
    });
    return "发送成功!";
}

运行结果
调用接口发送消息后,可以看到 Ready 消息为 1。10 秒钟之后,刷新页面,发现消息已被删除。如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。

1.3 设置队列的 TTL

设置队列 TTL 的方法是在创建队列时,加入 x-message-ttl 参数实现的,单位是毫秒。

配置队列和绑定关系

public static final String TTL_QUEUE2 = "ttl_queue2";

// 设置 ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {
    // 设置 20 秒过期
    return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();
}

// 3. 队列和交换机绑定 Binding
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, 
                           @Qualifier("ttlQueue2") Queue queue) {
    return BindingBuilder.bind(queue).to(exchange);
}

发送消息

@RequestMapping("/ttl")
public String ttl() {
    // 发送不带 ttl 的消息
    rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");
    return "发送成功!";
}

运行结果
运行之后发现,新增了一个队列,队列 Features 有一个 TTL 标识。调用接口发送消息后,可以看到 Ready 消息为 1。采用发布订阅模式,所有与该交换机绑定的队列(ttl_queue 和 ttl_queue2)都会收到消息。20 秒钟之后,刷新页面,发现 ttl_queue2 中的消息已被删除,由于 ttl_queue 队列未设置过期时间,所以该队列的消息未删除。

1.4 两者区别

设置队列 TTL 属性的方法,一旦消息过期,就会从队列中删除;设置消息 TTL 的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判定的。

这是因为设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。

而设置消息 TTL 的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。


2. 死信队列

2.1 死信的概念

死信(dead message)简单理解就是因为种种原因,无法被消费的信息,就是死信。当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX(Dead Letter Exchange),绑定 DLX 的队列,就称为死信队列(Dead Letter Queue,简称 DLQ)。

消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
  • 消息过期。
  • 队列达到最大长度。

2.2 代码示例

2.2.1 声明队列和交换机

包含两部分:声明正常的队列和正常的交换机;声明死信队列和死信交换机。死信交换机和死信队列和普通的交换机、队列没有区别。

// 死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";

@Configuration
public class DLXConfig {
    // 死信交换机
    @Bean("dlxExchange")
    public Exchange dlxExchange() {
        return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();
    }

    // 2. 死信队列
    @Bean("dlxQueue")
    public Queue dlxQueue() {
        return QueueBuilder.durable(Constant.DLX_QUEUE).build();
    }

    // 3. 死信队列和交换机绑定 Binding
    @Bean("dlxBinding")
    public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, 
                              @Qualifier("dlxQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
    }

    // 正常交换机
    @Bean("normalExchange")
    public Exchange normalExchange() {
        return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();
    }

    // 正常队列
    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
    }

    // 正常队列和交换机绑定 Binding
    @Bean("normalBinding")
    public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, 
                                 @Qualifier("normalQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }
}
2.2.2 正常队列绑定死信交换机

当这个队列中存在死信时,RabbitMQ 会自动的把这个消息重新发布到设置的 DLX 上,进而被路由到另一个队列,即死信队列。可以监听这个死信队列中的消息以进行相应的处理。

@Bean("normalQueue")
public Queue normalQueue() {
    return QueueBuilder.durable(Constant.NORMAL_QUEUE)
           .deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
           .deadLetterRoutingKey("dlx").build();
}
2.2.3 制造死信产生的条件
@Bean("normalQueue")
public Queue normalQueue() {
    return QueueBuilder.durable(Constant.NORMAL_QUEUE)
           .deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
           .deadLetterRoutingKey("dlx")
           .ttl(10 * 1000)
           .maxLength(10L)
           .build();
}
2.2.4 发送消息
@RequestMapping("/dlx")
public void dlx() {
    // 测试过期时间,当时间达到 TTL,消息自动进入到死信队列
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
    // 测试队列长度
    // for (int i = 0; i < 20; i++) {
    //     rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
    // }
    // 测试消息拒收
    // rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
2.2.5 测试死信
  • 程序启动之后,观察队列:队列 Features 说明:D 是 durable 的缩写,表示设置持久化;TTL 表示 Time to Live,队列设置了 TTL;Lim 表示队列设置了长度(x-max-length);DLX 表示队列设置了死信交换机(x-dead-letter-exchange);DLK 表示队列设置了死信 RoutingKeyx-dead-letter-routing-key)。
  • 测试过期时间:调用接口发送消息,10 秒后,消息进入到死信队列。生产者首先发送一条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中。由于队列 normal_queue 设置了过期时间为 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息过期。由于设置了 DLX,过期之时,消息会被丢给交换器(dlx_exchange)中,这时根据 RoutingKey 匹配,找到匹配的队列(dlx_queue),最后消息被存储在 queue.dlx 这个死信队列中。
  • 测试达到队列长度:队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列。发送前,死信队列只有一条数据,发送 20 条消息后,运行后可以看到死信队列变成了 11 条。过期之后,正常队列的 10 条也会进入到死信队列。
  • 测试消息拒收:写消费者代码,并强制异常,测试拒绝签收。

@Component
public class DlxQueueListener {
    // 指定监听队列的名称
    @RabbitListener(queues = Constant.NORMAL_QUEUE)
    public void ListenerQueue(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), 
                              message.getMessageProperties().getDeliveryTag());
            // 模拟处理失败
            int num = 3 / 0;
            System.out.println("处理完成");
            // 3. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 4. 异常了就拒绝签收
            Thread.sleep(1000);
            // 第三个参数 requeue,是否重新发送,如果为 true,则会重新发送,若为 false,则直接丢弃,若设置死信,会进入到死信队列
            channel.basicNack(deliveryTag, true, false);
        }
    }

    // 指定监听队列的名称
    @RabbitListener(queues = Constant.DLX_QUEUE)
    public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("死信队列接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), 
                          message.getMessageProperties().getDeliveryTag());
    }
}

发送消息,观察运行结果:

接收到消息: dlx test..., deliveryTag: 1
死信队列接收到消息: dlx test..., deliveryTag: 1

2.3 常见问题

  • 死信队列的概念:死信(Dead Letter)是消息队列中的一种特殊消息,它指的是那些无法被正常消费或处理的消息。在消息队列系统中,如 RabbitMQ,死信队列用于存储这些死信消息。
  • 死信的来源
    • 消息过期:消息在队列中存活的时间超过了设定的 TTL。
    • 消息被拒绝:消费者在处理消息时,可能因为消息内容错误、处理逻辑异常等原因拒绝处理该消息。如果拒绝时指定不重新入队(requeue=false),消息也会成为死信。
    • 队列满了:当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信。
  • 死信队列的应用场景:对于 RabbitMQ 来说,死信队列是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。比如:用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列中,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。其他应用场景还有消息重试、消息丢弃、日志收集等。

3. 延迟队列

3.1 概念

延迟队列(Delayed Queue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

3.2 应用场景

延迟队列的使用场景有很多,比如:

  • 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
  • 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议。
  • 用户注册成功后,7 天后发送短信,提高用户活跃度等。

3.3 TTL + 死信队列实现

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 TTL + 死信队列的方式组合模拟出延迟队列的功能。

代码实现

声明队列

// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 绑定死信队列
    arguments.put("x-dead-letter-routing-key", "dlx"); // 设置发送给死信队列的 RoutingKey
    return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}

生产者:发送两条消息,一条消息 10s 后过期,第二条 20s 后过期。

@RequestMapping("/delay")
public String delay() {
    // 发送带 ttl 的消息
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..." + new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration("10000"); // 10s 过期
        return messagePostProcessor;
    });
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..." + new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration("20000"); // 20s 过期
        return messagePostProcessor;
    });
    return "发送成功!";
}

消费者

// 指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.printf ("% tc 死信队列接收到消息: % s, deliveryTag: % d% n", new Date (), new String (message.getBody (),"UTF-8"),
message.getMessageProperties ().getDeliveryTag ());
}

**运行程序**:调用接口发送数据`http://127.0.0.1:8080/product/delay`,通过控制台观察死信队列消费情况:
周三 5 月 22 11:59:00 CST 2024 死信队列接收到消息: ttl test 10s...Wed May 22 11:58:50 CST 2024, deliveryTag: 1
周三 5 月 22 11:59:10 CST 2024 死信队列接收到消息: ttl test 20s...Wed May 22 11:58:50 CST 2024, deliveryTag: 2
可以看到,两条消息按照过期时间依次进入了死信队列。延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里,这样消费者一直消费死信队列里的消息就可以了。

**存在问题**:当把生产消息的顺序修改为先发送20s过期数据,再发送10s过期数据时:
```java
@RequestMapping("/delay")
public String delay() {
    // 发送带ttl的消息
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", 
    "ttl test 20s..."+new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration("20000"); 
        return messagePostProcessor;
    });
    rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", 
    "ttl test 10s..."+new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setExpiration("10000"); 
        return messagePostProcessor;
    });
    return "发送成功!";
}

通过控制台观察死信队列消费情况:

周三 5月 22 12:14:22 CST 2024 死信队列接收到消息: ttl test 20s...Wed May 22 12:14:02 CST 2024, deliveryTag: 3
周三 5月 22 12:14:22 CST 2024 死信队列接收到消息: ttl test 10s...Wed May 22 12:14:02 CST 2024, deliveryTag: 4

这时会发现:10s 过期的消息,也是在 20s 后才进入到死信队列。这是因为 RabbitMQ 只会检查队首消息是否过期,如果过期则丢到死信队列。如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。所以在考虑使用 TTL + 死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

3.4 延迟队列插件

RabbitMQ 官方提供了一个延迟的插件来实现延迟的功能。
安装延迟队列插件

  1. 下载并上传插件:根据自己的 RabbitMQ 版本从插件下载地址选择相应版本的延迟插件,下载后上传到服务器。插件上传目录参考installing Additional Plugins | RabbitMQ,/usr/lib/rabbitmq/plugins是一个附加目录,RabbitMQ 包本身不会在此安装任何内容,如果没有这个路径,可以自己进行创建。如果为 docker 操作,使用docker cp命令复制文件到 docker 容器,例如:docker cp 宿主机文件 容器名称或ID:容器目录
  2. 启动插件:在服务器命令行中,使用rabbitmq - plugins list查看插件列表,使用rabbitmq - plugins enable rabbitmq_delayed_message_exchange启动插件,之后重启 RabbitMQ 服务。如果为 docker 操作,进入容器后同样执行这两个命令来查看和启动插件,最后重启 docker 容器。
  3. 验证插件:在 RabbitMQ 管理平台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了。

基于插件延迟队列实现

声明交换机、队列、绑定关系

import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayedConfig {
    @Bean("delayedExchange")
    public Exchange delayedExchange() {
        return ExchangeBuilder.directExchange(Constant.DELAYED_EXCHANGE_NAME).durable(true).delayed().build();
    }
    //2. 队列
    @Bean("delayedQueue")
    public Queue delayedQueue() {
        return QueueBuilder.durable(Constant.DELAYED_QUEUE).build();
    }
    //3. 队列和交换机绑定 Binding
    @Bean("delayedBinding")
    public Binding delayedBinding(@Qualifier("delayedExchange") Exchange exchange, 
                                  @Qualifier("delayedQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("delayed").noargs();
    }
}

生产者:发送两条消息,并设置延迟时间。

@RequestMapping("/delay2")
public String delay2() {
    // 发送带ttl的消息
    rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
    "delayed test 20s..."+new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setDelayLong(20000L); 
        return messagePostProcessor;
    });
    rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
    "delayed test 10s..."+new Date(), messagePostProcessor -> {
        messagePostProcessor.getMessageProperties().setDelayLong(10000L); 
        return messagePostProcessor;
    });
    return "发送成功!";
}

消费者

import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;

@Component
public class DelayedQueueListener {
    //指定监听队列的名称
    @RabbitListener(queues = Constant.DELAYED_QUEUE)
    public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
        System.out.printf("%tc 死信队列接收到消息: %s%n", new Date(), new String(message.getBody(),"UTF-8"));
    }
}

运行程序,并测试:程序启动后,调用接口发送消息http://127.0.0.1:8080/product/delay2,观察控制台:

周三 5月 22 15:42:02 CST 2024 死信队列接收到消息: delayed test 10s...Wed May 22 15:41:52 CST 2024
周三 5月 22 15:42:12 CST 2024 死信队列接收到消息: delayed test 20s...Wed May 22 15:41:52 CST 2024

从结果可以看出,使用延迟队列,可以保证消息按照延迟时间到达消费者。

介绍下 RabbitMQ 的延迟队列:延迟队列是一个特殊的队列,消息发送之后,并不立即给消费者,而是等待特定的时间,才发送给消费者。延迟队列的应用场景有很多,比如订单在十分钟内未支付自动取消、用户注册成功后 3 天后发调查问卷、用户发起退款 24 小时后商家未处理则默认同意自动退款等。但 RabbitMQ 本身并没直接实现延迟队列,通常有两种方法:

  1. TTL + 死信队列组合的方式:优点是灵活不需要额外的插件支持;缺点是存在消息顺序问题,需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性。
  2. 使用官方提供的延迟插件实现延迟功能:优点是通过插件可以直接创建延迟队列,简化延迟消息的实现,避免了 DLX 的时序问题;缺点是需要依赖特定的插件,有运维工作,只适用于特定版本。

4. 事务

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。

4.1 配置事务管理器

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TransactionConfig {
    @Bean
    public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
}

4.2 声明队列

@Bean("transQueue")
public Queue transQueue() {
    return QueueBuilder.durable("trans_queue").build();
}

4.3 生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/trans")
@RestController
public class TransactionProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    @RequestMapping("/send")
    public String send() {
        rabbitTemplate.convertAndSend("","trans_queue", "trans test 1...");
        int a = 5/0;
        rabbitTemplate.convertAndSend("","trans_queue", "trans test 2...");
        return "发送成功";
    }
}

4.4 测试

  1. 不加@Transactional,会发现消息 1 发送成功。
  2. 添加@Transactional,消息 1 和消息 2 全部发送失败,体现了事务的原子性。

5. 消息分发

5.1 概念

RabbitMQ 队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表里的一个消费者。默认情况下,RabbitMQ 是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式不太合理,比如某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

我们可以使用channel.basicQos(int prefetchCount)方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量。比如消费端调用了channel.basicQos(5),RabbitMQ 会为该消费者计数,发送一条消息计数 + 1,消费一条消息计数 - 1,当达到了设定的上限,RabbitMQ 就不会再向它发送消息了,直到消费者确认了某条消息,类似 TCP/IP 中的 “滑动窗口”。prefetchCount设置为 0 时表示没有上限,basicQos对拉模式的消费无效。

5.2 应用场景

5.2.1 限流

在订单系统中,正常情况下每秒可处理 5000 请求,但在秒杀时请求瞬间达每秒 1 万个,若全部通过 MQ 发送会压垮订单系统。通过设置prefetchCount参数并将消息应答方式设为手动应答,可实现限流。

配置 prefetch 参数和应答方式

listener:
    simple:
        acknowledge-mode: manual
        prefetch: 5

配置交换机和队列

import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QosConfig {
    @Bean("qosExchange")
    public Exchange qosExchange() {
        return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();
    }
    //2. 队列
    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constant.QOS_QUEUE).build();
    }
    //3. 队列和交换机绑定 Binding
    @Bean("qosBinding")
    public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, 
                              @Qualifier("qosQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
    }
}

发送消息:一次发送 20 条消息。

@RequestMapping("/qos")
public String qos() {
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME, "qos", "qos test..."+i);
    }
    return "发送成功!";
}

消费者监听

@Component
public class QosQueueListener {
    @RabbitListener(queues = Constant.QOS_QUEUE)
    public void ListenerQueue(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);
    }
}

测试:调用接口发送消息,控制台只打印 5 条消息,管理平台显示待发送 15 条,未确认 5 条。取消prefetch配置,消费者会一次性接收 20 条消息。

5.2.2 负载均衡

在有两个消费者的情况下,若一个消费者处理任务快,一个慢,会导致负载不均衡。通过设置prefetch = 1,可让 RabbitMQ 一次只给一个消费者一条消息,处理并确认前一条消息后再发送新消息,实现负载均衡。

配置 prefetch 参数和应答方式

listener:
    simple:
        acknowledge-mode: manual
        prefetch: 1

启动两个消费者:使用Thread.sleep(100)模拟消费慢。

@Component
public class QosQueueListener {
    @RabbitListener(queues = Constant.QOS_QUEUE)
    public void ListenerQosQueue(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);
        channel.basicAck(deliveryTag, true);
    }

    @RabbitListener(queues = Constant.QOS_QUEUE)
    public void ListenerQueue2(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);
        Thread.sleep(100);
        channel.basicAck(deliveryTag, true);
    }
}

测试:调用接口发送消息,通过日志观察两个消费者消费消息情况,可看到消息在两个消费者间均衡分配。

感谢阅览。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2313316.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

表格columns拼接两个后端返回的字段(以umi框架为例)

在用组件对前端项目进行开发时&#xff0c;我们会遇到以下情况&#xff1a;项目原型中有取值范围这个表字段&#xff0c;需要存放最小取值到最大取值。 而后端返回给我们的数据是返回了一个最小值和一个最大值&#xff0c; 在columns中我们需要对这两个字段进行拼接&#xff0…

sparkTTS window 安装

SparkTTS 的简介 Spark-TTS是一种基于SpardAudio团队提出的 BiCodec 构建的新系统&#xff0c;BiCodec 是一种单流语音编解码器&#xff0c;可将语音策略性地分解为两种互补的标记类型&#xff1a;用于语言内容的低比特率语义标记和用于说话者特定属性的固定长度全局标记。这种…

【K8S系列】深入探究Kubernetes中查看日志的方法

在Kubernetes&#xff08;简称K8s&#xff09;的世界里&#xff0c;日志是诊断和排查问题的关键线索。无论是应用程序的运行状态、错误信息&#xff0c;还是系统的健康状况&#xff0c;都能从日志中找到蛛丝马迹。本文将详细介绍在K8s中查看日志的各种方法&#xff0c;从基础的…

JmeterHttp请求头管理出现Unsupported Media Type问题解决

JmeterHttp请求头管理出现Unsupported Media Type问题解决 大多数的app与pc端压测的时候都会出现这种情况 当我们在jemter测试当中当中遇见Unsupported Media Type&#xff0c;有一种可能就是我们请求的网页的content-Type的类型与我们测试的时候的类型不一致 解决方法 可以添…

十大数据科学Python库

十大数据科学Python库 1、NumPy&#xff1a;脊髓2、Pandas&#xff1a;数据操纵专家3、Matplotlib&#xff1a;艺术之魂4、Scikit-Learn&#xff1a;瑞士军刀5、TensorFlow&#xff1a;聪明的家伙6、PyTorch&#xff1a;叛逆者7、Selenium&#xff1a;操纵大师8、NLTK&#xff…

LabVIEW伺服阀高频振动测试

在伺服阀高频振动测试中&#xff0c;闭环控制系统的实时性与稳定性至关重要。针对用户提出的1kHz控制频率需求及Windows平台兼容性问题&#xff0c;本文重点分析NI PCIe-7842R实时扩展卡的功能与局限性&#xff0c;并提供其他替代方案的综合对比&#xff0c;以帮助用户选择适合…

解决asp.net mvc发布到iis下安全问题

解决asp.net mvc发布到iis下安全问题 环境信息1.The web/application server is leaking version information via the "Server" HTTP response2.确保您的Web服务器、应用程序服务器、负载均衡器等已配置为强制执行Strict-Transport-Security。3.在HTML提交表单中找不…

CSS-基础选择器,字体属性,文本属性介绍

一、CSS 简介 CSS 是层叠样式表 ( Cascading Style Sheets ) 的简称. 有时我们也会称之为 CSS 样式表或级联样式表&#xff61; CSS 是也是一种标记语言 CSS 主要用于设置 HTML 页面中的文本内容(字体&#xff64;大小&#xff64;对齐方式等)&#xff64;图片的外形(宽高&a…

vtkDepthSortPolyData 根据相机视图方向对多边形数据进行排序

1. 作用 在 3D 渲染中&#xff0c;透明对象的渲染顺序非常重要。如果透明对象的渲染顺序不正确&#xff0c;可能会导致错误的视觉效果&#xff08;例如&#xff0c;远处的透明对象遮挡了近处的透明对象&#xff09;。vtkDepthSortPolyData 通过对多边形数据进行深度排序&#…

【MySQL_04】数据库基本操作(用户管理--配置文件--远程连接--数据库信息查看、创建、删除)

文章目录 一、MySQL 用户管理1.1 用户管理1.11 mysql.user表详解1.12 添加用户1.13 修改用户权限1.14 删除用户1.15 密码问题 二、MySQL 配置文件2.1 配置文件位置2.2 配置文件结构2.3 常用配置参数 三、MySQL远程连接四、数据库的查看、创建、删除4.1 查看数据库4.2 创建、删除…

牛客网刷题(5)(HTML之元素<input>、表格<table>与描述列表<dl>、元素<label>)

目录 一、哪种输入类型定义滑块控件&#xff1f;元素&#xff08;input&#xff09; &#xff08;1&#xff09;官方解析。 &#xff08;2&#xff09;总结。 &#xff08;3&#xff09;牛客大佬总结。 &#xff08;4&#xff09;HTML5——元素&#xff08;input&#xff09;的…

IDEA(十一)调整新版本的工具栏显示Git操作(pull、commit、push、revert等)

目录 一、背景二、操作步骤2.1 开启新 UI 样式2.2 设置 Tool Window 工具栏 一、背景 好久没有更新 IDEA 了&#xff0c;更新之后发现 IDEA 的工具栏消失了。一番操作之后&#xff0c;终于把 IDEA 的工具栏的设置调整好了&#xff0c;在此进行记录调整步骤&#xff0c;供大家学…

C++编程:进阶阶段—4.2对象

目录 4.2 对象特征 4.2.1 构造函数和析构函数 4.2.2 构造函数的分类 4.2.3 拷贝函数调用时机 4.2.4 构造函数调用规则 4.2.5 深拷贝与浅拷贝 4.2.6 初始化列表 4.2.7 类对象作为类成员 4.2.8 静态成员 4.2.9 成员变量和成员函数的存储 4.2.10 this指针 4.2.11 空指针…

TensorFlow.js 全面解析:在浏览器中构建机器学习应用

TensorFlow.js 全面解析&#xff1a;在浏览器中构建机器学习应用 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;可以分享一下给大家。点击跳转到网站。 https://www.captainbed.cn/ccc 文章目录 TensorFlow.js 全面解析&#x…

CI/CD—Jenkins配置Poll SCM触发自动构建

Poll SCM简介 在 Jenkins 等持续集成工具中&#xff0c;“Poll SCM” 是一种用于轮询软件配置管理&#xff08;SCM&#xff09;系统以检查代码变更的机制&#xff0c;以下是对它的详细介绍&#xff1a; 作用 “Poll SCM” 允许 Jenkins 定期检查指定的 SCM 系统&#xff08;如 …

AI与SEO关键词智能解析

内容概要 人工智能技术正重塑搜索引擎优化的底层逻辑&#xff0c;其核心突破体现在关键词解析维度的结构性升级。通过机器学习算法对海量搜索数据的动态学习&#xff0c;AI不仅能够识别传统TF-IDF模型中的高频词汇&#xff0c;更能捕捉语义网络中隐含的关联特征。下表展示了传…

STM32之BKP

VBAT备用电源。接的时候和主电源共地&#xff0c;正极接在一起&#xff0c;中间连接一个100nf的电容。BKP是RAM存储器。 四组VDD都要接到3.3V的电源上&#xff0c;要使用备用电池&#xff0c;就把电池正极接到VBAT&#xff0c;负极跟主电源共地。 TEMPER引脚先加一个默认的上拉…

c++的基础排序算法

一、快速排序 1. 选择基准值&#xff08;Pivot&#xff09; 作用 &#xff1a;从数组中选择一个元素作为基准&#xff08;Pivot&#xff09;&#xff0c;用于划分数组。常见选择方式 &#xff1a; 固定选择最后一个元素&#xff08;如示例代码&#xff09;。随机选择&#xf…

基于Spring3的抽奖系统

注&#xff1a;项目git仓库地址&#xff1a;demo.lottery 小五Z/Spring items - 码云 - 开源中国 目录 注&#xff1a;项目git仓库地址&#xff1a;demo.lottery 小五Z/Spring items - 码云 - 开源中国 项目具体代码可参考仓库源码&#xff0c;本文只讲解重点代码逻辑 一…

基于qiime2的16S数据分析全流程:从导入数据到下游分析一条龙

目录 创建metadata 把数据导入qiime2 去除引物序列 双端合并 &#xff08;dada2不需要&#xff09; 质控 &#xff08;dada2不需要&#xff09; 使用deblur获得特征序列 使用dada2生成代表序列与特征表 物种鉴定 可视化物种鉴定结果 构建进化树&#xff08;ITS一般不构建进化树…