RabbitMQ队列

news2024/11/15 4:39:36

RabbitMQ队列

1、死信的概念

​ 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer 从 queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
​ 应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

1.1、死信的来源

  • 消息TTL过期;
  • 队列达到最大长度(队列满了,无法再添加数据到mq.中);
  • 消息被拒绝(basic.reject 或 basic.nack)并且requeue=false;

1.2、死信实战

架构图

image-20240301203605503

1.3、消费者01

//死信队列
public class DeadMessageConsumer01 {
    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //声明普通和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列,这一次需要使用参数了
        Map<String, Object> arguments = new HashMap<>();
        //这里就要设置死信到死信交换机上了
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信routingKey
        arguments.put("x-dead-letter-routing-key","deadKey");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //开始分别将死信和普通交换机与队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalKey");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadKey");

        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("死信测试普通队列消费者01收到消息:"+new String(message.getBody()));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag -> {});
    }

}

1.4、生产者

//死信队列的生产者
public class DeadMessageProducers {
    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //死信消息设置TTL过期时间,设置10秒
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        //发10条消息
        for (int i = 1; i < 11; i++) {
            String message = "dead"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"normalKey",basicProperties,message.getBytes());
            System.out.println("发送消息成功:"+ message);
        }
    }
}

​ 先把消费者启动,将交换机和队列先声明出来,然后关掉消费者再启动生产者,生产者一直发这10个消息,但是消费者已经停止了,没人消费,所以TTL一到期就会被放进死信队列里,我们在web管理界面就能看到,普通队列的10条消息很快就跑到死信队列中了。

image-20240302110147069

image-20240302111240369

​ 然后现在死信队列里有那10条消息,再来一个专门消费死信消息的消费者2,非常简单,只需要消费死信队列即可。

//死信队列
public class DeadMessageConsumer02 {
    //死信队列
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("死信测试普通队列消费者02收到消息:"+new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

启动它消费死信队列

image-20240302111857175

消费完成,看看管理界面,死信消息归0

image-20240302111933926

1.5、队列达到最大长度的死信

​ 在原来的消费者基础上增加一条

//设置队列最大长度
arguments.put("x-max-length",6);

​ 原来的生产者也不需要给消息设置过期时间了直接发送消息即可。

​ 测试之前先将原来的普通队列删除再启动消费者

image-20240302134737969

​ 仍然把消费者停掉,生产者启动,发送10条消息6条正常4条超过了限制会被放到死信队列

image-20240302134915165

1.6、消息被拒死信队列

我们先把原来的消息全部消费掉或者直接删除原来的队列,再演示新的。

消费者做一些修改

DeliverCallback deliverCallback = (consumerTag,message) ->{
    String msg = new String(message.getBody());
    if (msg.equals("dead5")){
        System.out.println("拒绝第5个消息:"+ msg);
        //basicReject第二个参数是是否还放回队列
        channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
    } else {
        System.out.println("死信测试普通队列消费者01收到消息:" + msg);
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    }

};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});

生产者发送10条消息,看到消费者01拒绝了第5个消息,其他的正常收到

image-20240302143440745

通过管理界面看到第5条消息放到了死信队列中

image-20240302143507611

启动专门消费死信队列的消费者02将第5个消息消费。

image-20240302143537357

2、延迟队列

2.1、延迟队列概念

​ 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

其实,TTL过期导致的死信队列就是延迟队列

应用场景:

  • 1.订单在十分钟之内未支付则自动取消
  • ⒉新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

​ 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

​ 如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

​ 但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

2.1.1、RabbitMQ 中的 TTL

​ TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间, 单位是毫秒。

​ 换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

  • 消息设置 TTL

    例如:

     rabbitTemplate.convertAndSend("X","XC",message,msg -> {
            //发送消息设置消息的TTL
            msg.getMessageProperties().setExpiration(ttl+"000");
            return msg;
        });
    
  • **队列设置 TTL **

    例如:在创建队列的时候设置队列的“x-message-ttl”属性

     //声明普通队列 TTL 为40秒
        @Bean("queueB")
        public Queue queueB(){
            HashMap<String, Object> map = new HashMap<>(3);
            //设置死信交换机
            map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            map.put("x-dead-letter-routing-key","YD");
            //设置过期时间,10秒
            map.put("x-message-ttl",40000);
            return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
        }
    

2.2、整合SpringBoot

添加依赖项


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <!-- 添加 Spring Boot 版本 -->
        <spring.boot.version>2.3.4.RELEASE</spring.boot.version>
        <!-- 添加 Lombok 版本 -->
        <lombok.version>1.18.20</lombok.version>
        <!-- 添加 Spring AMQP 版本 -->
        <spring.amqp.version>2.1.4.RELEASE</spring.amqp.version>
        <!-- 添加 Springfox Swagger 版本 -->
        <springfox.version>2.9.2</springfox.version>
    </properties>

    <dependencies>
        <!-- RabbitMQ 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>${spring.boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${spring.boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>${spring.boot.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.taobao.arthas</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80-fix</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
        <!-- Swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${springfox.version}</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${springfox.version}</version>
        </dependency>
        <!-- RabbitMQ 测试依赖 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <version>${spring.amqp.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

创建application.properties文件

spring.rabbitmq.host=192.168.111.28
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.redis.password=123

swagger2的配置类SwaggerConfig

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig(){
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("zm", "http://zmblog.vip",
                        "3339332352@qq.com"))
                .build();
    }
}

2.3、队列TTL

​ 创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

image-20240302163555580

2.4、配置文件类代码

TtlQueueConfig

//TTL队列,配置类代码
@Configuration
public class TtlQueueConfig {
    //普通交换机名称
    public static final String X_EXCHANGE = "X";
    //死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列名称,两个
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称
    public static final String DEAD_LETTER_QUEUE = "QD";

    //声明普通交换机X_EXCHANGE,起一个别明xExchange注入bean
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //声明死信交换机Y_EXCHANGE,起一个别明yExchange注入bean
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //声明普通队列 TTL 为10秒
    @Bean("queueA")
    public Queue queueA(){
        HashMap<String, Object> map = new HashMap<>(3);
        //设置死信交换机
        map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        map.put("x-dead-letter-routing-key","YD");
        //设置过期时间,10秒
        map.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }
    //声明普通队列 TTL 为40秒
    @Bean("queueB")
    public Queue queueB(){
        HashMap<String, Object> map = new HashMap<>(3);
        //设置死信交换机
        map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        map.put("x-dead-letter-routing-key","YD");
        //设置过期时间,10秒
        map.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
    }
    //死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    //绑定
    @Bean
    public Binding QABindingX(@Qualifier("queueA") Queue queueA,
                              @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding QBBindingX(@Qualifier("queueB") Queue queueB,
                              @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding QDBindingY(@Qualifier("queueD") Queue queueD,
                              @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者controller,DeadLetterProducers

//生产者发送延迟消息
@Slf4j
@RestController
public class DeadLetterProducers {
    //自动注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自于TTL为10秒的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自于TTL为40秒的队列:"+message);
    }
}

消费者:

//消费者
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息,定义一个监听器来监听死信队列中的消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel){
        String s = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),s);
    }

}

成功启动

image-20240302203312306

​ 在浏览器上输入localhost:8080/sendMsg/哈哈哈哈哈

​ 发现后台开始输出信息,先是发送的提示信息,然后过了10秒消费者收到了一个信息,然后再过30秒收到了第二条信息。

image-20240302203706023

​ 结果是没有问题的,第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

​ 不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

2.5、延迟队列优化

​ 在这里新增了一个队列 QC,它是通用的延迟队列,生产者掌握时间(这是设定消息的过期时间,之前是设定队列的过期时间),绑定关系如下,该队列不设置 TTL 时间

image-20240303104749887

我们让生产者来决定延迟队列的时间,需要多少就指定多少这样更符合需求。

在TtlQueueConfig类中添加内容

public static final String QUEUE_C = "QC";
//新加一个通用TTL的普通队列
    @Bean("queueC")
    public Queue queueC(){
        HashMap<String, Object> map = new HashMap<>(2);
        //设置死信交换机
        map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey
        map.put("x-dead-letter-routing-key","YD");
        //TTL就交给生产者指定了这里就不需要写了
        return QueueBuilder.durable(QUEUE_C).withArguments(map).build();
    }
//QC绑定普通交换机
 @Bean
    public Binding QCBindingX(@Qualifier("queueC") Queue queueC,
                              @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者的controller添加一个请求

//发指定TTL的消息
@GetMapping("/sendExpMsg/{message}/{ttl}")
public void sendExpMsg(@PathVariable String message,@PathVariable String ttl){
    log.info("当前时间:{},发送一条时长为:{}毫秒的队列消息给队列QC:{}",new Date(),ttl,message);
    rabbitTemplate.convertAndSend("X","XC",message,msg -> {
        //发送消息设置消息的TTL
        msg.getMessageProperties().setExpiration(ttl+"000");
        return msg;
    });
}

​ 我们启动服务,浏览器上分别输入localhost:8080/sendExpMsg/这是第一条信息/20,localhost:8080/sendExpMsg/这是第二条信息/2

image-20240303121301599

​ 我们发现,结果并不是我们所设想的那样,延时20秒的和2秒的两条消息都是在20秒时接收到,同时接收了,而这个顺序也是按照先进先出的队列特性来的,显然这不是实际需求。

​ 在最开始的时候,我们就了解到如果在消息属性上设置TTL的方式,消息可能并不会按时“死亡”,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死新信队列,如果第一个消息的延时时间非常长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

​ 那么如何解决这个问题,就需要使用RabbitMQ的插件实现延迟队列

2.6、RabbitMQ的插件实现延迟队列

​ 如果不能实现在消息粒度上的TTL,并使其在设置的TL时间及时死亡,就无法设计成一个通用的延时队列。

2.6.1、安装RabbitMQ的延时队列插件

​ 在官网上下载https://www.rabbitmq.com/community-plugins.html,rabbitmq_delayed_message__exchange插件,然后解压放置到RabbitMQ的插件目录。

​ 下载3.8.0的连接rabbitmq_delayed_message_exchange-3.8.0.ez (github.com)

image-20240303124317709

​ 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

​ 把插件放到RabbitMQ的插件文件夹

image-20240303124759301

​ 然后进行安装

image-20240303124912826

​ 重启Rabbitmq之后就可以了,可以看到交换机的类型新加一个x-delayed-message类型的

image-20240303125247971

​ 这样延迟的任务就交给交换机来做了,生产者把消息发给交换机,然后消息就在交换机延迟了,然后延迟时间过了之后就可以发给普通的队列了。

2.6.2、基于插件的延迟消息实现

​ 我们新加一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

在这里插入图片描述

​ 在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中。

配置文件类

//使用插件实现延迟
public class DelayedQueueConfig {
    //声明延迟类型的交换机名称
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
    //声明队列
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
    }
    //声明交换机CustomExchange是自定义交换机,由于使用插件提供的类型在原来的四个类型中没有所以只能自定义
    /* public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        super(name, durable, autoDelete, arguments);
        this.type = type;
    }
    第一个参数就是交换机的名称
    第二个参数就是交换机的自定义类型
    第三个参数就是是否持久化
    第四个参数就是是否自动删除
    第五个是其他参数
    * */
    @Bean
    public CustomExchange delayedExchange(){
        HashMap<String, Object> arguments = new HashMap<>();
        //设置自定义交换机的类型
        //消息确实是延迟了,但是怎么传播到队列呢,是要多播呢还是直连呢,所以还得设置类型
        //这里设置直连,怎么发送是直连的发送,啥时候发送就是延迟。
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                true,false,arguments);
    }
    //绑定
    //返回中最后的构建,build有参数,noargs没有参数
    @Bean
    public Binding delayedBinding(@Qualifier("delayedQueue") Queue delayedQueue,
                                  @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

​ 生产者controller新加请求方法

//延迟插件发送消息
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayMsg(@PathVariable String message,@PathVariable int delayTime){
    log.info("当前时间:{},发送一条时长为:{}秒的队列消息延迟队列delay.queue:{}",new Date(),delayTime,message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
            DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
        //发送消息设置延时时长
        msg.getMessageProperties().setDelay(delayTime*1000);
        return msg;
    });
}

​ 消费者

//专门消费由延迟交换机发出的消息
@Slf4j
@Component
public class DelayedLetterConsumer {
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayed(Message message){
        String s = new String(message.getBody());
        log.info("当前时间:{},收到delayed.queue队列的消息:{}",new Date(),s);
    }
}

我们启动服务浏览器输入localhost:8080/sendDelayMsg/这是第一条信息/20,localhost:8080/sendDelayMsg/这是第二条信息/2

image-20240303142214558

​ 可以看到结果是很正确的,第二条消息在发送2秒后就收到了,第一条消息在它发送20秒后收到的

2.6.3、总结

​ 延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

​ 当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1487373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

模型部署 - BevFusion - (1) - 思路总结

模型部署实践 - BevFusion 思路总结一、网络结构 - 总结1.1、代码1.2、网络流程图1.3、模块大致梳理 二、Onnx 的导出 -总体思路分析三、优化思路总结 学习 BevFusion 的部署&#xff0c;看了很多的资料&#xff0c;这篇博客进行总结和记录自己的实践 思路总结 对于一个模型我…

如何限制一个账号只在一处登陆

大家好&#xff0c;我是广漂程序员DevinRock&#xff01; 1. 需求分析 前阵子&#xff0c;和问答群里一个前端朋友&#xff0c;随便唠了唠。期间他问了我一个问题&#xff0c;让我印象深刻。 他问的是&#xff0c;限制同一账号只能在一处设备上登录&#xff0c;是如何实现的…

【Java项目介绍和界面搭建】拼图小游戏——美化界面

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

AI企业发力智慧物流 HEGERLS四向车开启新一代托盘柔性物流解决方案

不论自动仓储、智能仓储&#xff0c;解决方案都需要更加平民化&#xff0c;普惠更多企业。柔性灵活、易于部署和扩展、初期投入成本低的方案一定是其中的重点。要实现这些特点&#xff0c;最重要的是硬件要做到标准化、软件要模块化&#xff0c;让仓储设备可以即插即用。凭借柔…

三维可视化技术在设备管理系统中的应用

随着科技的进步&#xff0c;传统的设备管理方法已经不能满足现代企业的需求。为了更高效地管理资产&#xff0c;设备管理系统开始采用三维可视化动态技术。这种技术不仅能够帮助用户快速找到相应的设备&#xff0c;还能够展示设备的现场位置、所处环境、关联设备以及设备参数等…

密钥加密机的工作原理

密钥加密机是信息安全领域中不可或缺的核心设备&#xff0c;它承担着保护通信内容、确保数据完整性以及验证信息发送方身份等重要任务。随着信息技术的迅猛发展&#xff0c;密钥加密机的作用愈发凸显&#xff0c;其安全性和可靠性直接关系到国家安全、商业机密和个人隐私等多个…

Ubuntu20安装zabbix-agent2,对接zabbix 6.4

在Ubuntu 20.04 LTS上安装Zabbix Agent 2并与Zabbix Server 6.4对接&#xff0c;请按照以下步骤操作&#xff1a; 更新系统&#xff1a; sudo apt update sudo apt upgrade 添加Zabbix官方仓库&#xff1a; 首先&#xff0c;需要将Zabbix的官方存储库添加到你的系统中以获取Za…

U盘无法读取?轻松掌握正确解决方法!

“为什么我的u盘插入电脑后会显示无法读取呢&#xff1f;想查看一些比较重要的文件&#xff0c;但就是无法读取U盘&#xff0c;想问问大家&#xff0c;我应该怎么操作呢&#xff1f;” U盘作为一种便捷的数据存储设备&#xff0c;广泛应用于我们的日常生活和工作中。然而&#…

探索前景:机器学习中常见优化算法的比较分析

目录 一、介绍 二、技术背景 三、相关代码 四、结论 一、介绍 优化算法在机器学习和深度学习中至关重要&#xff0c;可以最小化损失函数&#xff0c;从而改善模型的预测。每个优化器都有其独特的方法来导航损失函数的复杂环境以找到最小值。本文探讨了一些最常见的优化算法&…

程序员如何选择职业赛道?

程序员选择职业赛道就像是在一个充满挑战和机遇的迷宫中探索。不同的职业赛道代表着不同的路径&#xff0c;每条路径都有其独特的风景和挑战。我愿意为大家提供一些关于如何选择职业赛道的建议。本文将分为几个部分&#xff0c;包括了解自己、了解行业、职业规划、技能提升和持…

阿里云服务器4核8G配置多少钱?来看看,不看白不看!

阿里云服务器4核8g配置多少钱一年&#xff1f;1个月费用多少&#xff1f;云服务器u1实例3折优惠价955.58元一年&#xff0c;计算型c7云服务器4核8G价格2944.79元一年。4核8G服务器按月购买比较贵&#xff0c;经济型e实例4核8G配置1个月216元&#xff0c;通用算力型u1服务器336.…

自动化构建平台(四)Linux搭建私有CI/CD工具之Jenkins的安装

文章目录 前言一、Jenkins本地安装1、使用war文件安装2、使用yum或者app-get安装 二、docker安装Jenkins三、Jenkins登录、配置操作总结 前言 在CD领域&#xff0c;Jenkins应该是元老级别的存在&#xff0c;很多现代的devs平台多少都能看到Jenkins的影子&#xff0c;但是Jenki…

Nucleic Acids Research | scATAC-seq+CUTTag探究关键转录因子对视网膜细胞分化的调控作用

在中枢神经系统发育过程中&#xff0c;多能神经祖细胞如何产生不同的神经细胞类型仍然知之甚少。最近的scRNA-seq研究已经描绘了包括神经视网膜在内的许多神经系统中单个神经细胞类型的发育轨迹。进一步了解神经细胞多样性的形成需要了解表观遗传景观如何沿着个体细胞谱系变化以…

Java中继承的作用及解析

在 Java 中&#xff0c;继承是一种非常重要的面向对象编程特性。它的主要作用包括以下几个方面&#xff1a; 代码复用&#xff1a;通过继承&#xff0c;子类可以复用父类的代码&#xff0c;包括属性和方法。这样可以避免重复编写相同的代码&#xff0c;提高代码的复用性和可维护…

Qt/C++音视频开发67-保存裸流加入sps/pps信息/支持264/265裸流/转码保存/拉流推流

一、前言 音视频组件除了支持保存MP4文件外&#xff0c;同时还支持保存裸流即264/265文件&#xff0c;以及解码后最原始的yuv文件。在实际使用过程中&#xff0c;会发现部分视频文件保存的裸流文件&#xff0c;并不能直接用播放器播放&#xff0c;查阅资料得知原来是缺少sps/p…

开源问答平台网站源码系统 带完整的搭建教程

互联网的快速发展&#xff0c;用户对于信息的需求日益增长。问答平台以其独特的形式&#xff0c;让用户能够快速地找到答案、分享经验和交流想法。然而&#xff0c;市场上的问答平台大多数都是封闭的&#xff0c;不仅限制了用户的自由度和参与度&#xff0c;也增加了开发者和运…

C 嵌入式系统设计模式 19:保护调用模式

本书的原著为&#xff1a;《Design Patterns for Embedded Systems in C ——An Embedded Software Engineering Toolkit 》&#xff0c;讲解的是嵌入式系统设计模式&#xff0c;是一本不可多得的好书。 本系列描述我对书中内容的理解。本文章描述嵌入式并发和资源管理模式之五…

Linux服务器搭建超简易跳板机连接阿里云服务器

简介 想要规范内部连接阿里云云服务器的方式&#xff0c;但是最近懒病犯了&#xff0c;先搞一个简易式的跳板机过渡一下&#xff0c;顺便在出一个教程&#xff0c;其他以后再说&#xff01; 配置方法 创建密钥 登录阿里云&#xff0c;找到云服务器ECS控制台&#xff0c;点击…

Unity 脚本-生命周期常用函数

在Unity中&#xff0c;万物皆是由组件构成的。 右键创建C&#xff03;脚本&#xff0c;拖动脚本到某物体的组件列表。 生命周期相关函数 using System.Collections; using System.Collections.Generic; using UnityEngine;// 必须要继承 MonoBehaviour 才是一个组件 // 类名…

分付在哪些商户可以使用消费,微信分付怎么提取出来到余额上面来?

分付是一款信用支付产品&#xff0c;用户可以使用分付进行线上线下的消费支付。下面是使用分付的一些方法&#xff1a; - 开通分付&#xff1a;在微信中搜索并开通分付服务&#xff0c;按照提示完成实名认证和绑定银行卡等操作。 - 线上支付&#xff1a;在支持分付的线上商户…