RabbitMQ(二)

news2025/1/16 6:49:39

二、高级特性、应用问题以及集群搭建

高级特性

1.消息的可靠性投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitMQ整个消息投递的路径为:
producer -> rabbitMQ broker -> exchange -> queue -> consumer

  • confirm确认模式
    confirm确认模式是再producer传递给exchange过程中控制消息的模式,当消息成功的从producer传递到了exchange,那么则会返回一个 confirmCallBack() 回调函数
  • return 退回模式
    return退回模式是指消息从exchange传递给queue过程中消息传递失败,则会返回一个returnCallBack() 回调函数

1.1 confirm确认模式的代码编写:

因为确认模式是producer到exchange,所以代码和配置修改应该写在生产者的模块中。
一步:开启确认模式

新版本的rabbitmq弃用了publish-confirms:true,可以改用
publisher-confirm-type: correlated实现同样的效果

spring:
  rabbitmq:
    password: heima
    username: heima
    port: 5673
    virtual-host: itcast
    host: 1.12.244.105
    #开启确认模式
    publisher-confirm-type: correlated

二步:编写confirmCallBack()函数
回调函数confirm()的返回值在发送消息成功时ack为true,但是我遇到一个问题,就是消息发送成功了,在队列中也能看到,但是返回值ack为false,

clean channel shutdown;

这是因为convertAncSend()方法结束后rabbitMQ的资源也就关闭了,所以就算成功了,回调函数返回值也是false;所以我们在后面强制睡眠200ms,让资源晚点关闭,这样的话得到的ack就是true了

package com.rabbitmq.springboot_mqproducer;

import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class SpringbootMqProducerApplicationTests {

    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关的配置信息
             * @param b 消息是否发送成功
             * @param s 消息发送失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm方法被执行了");
                System.out.println(b);
                if(b){
                    System.out.println("消息从producer -> exchange成功");
                    System.out.println("失败原因:" + s);
                }else{
                    System.out.println("消息从producer -> exchange失败");
                    System.out.println("失败原因:" + s);
                }
            }
        });
        rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"test.hello","测试springboot整合交换机");
        Thread.sleep(200);
    }
}

结果:
在这里插入图片描述

1.2 return回退模式的代码编写

一步:开启回退模式

spring:
  rabbitmq:
    password: heima
    username: heima
    port: 5673
    virtual-host: itcast
    host: 1.12.244.105
    #开启确认模式
    publisher-confirm-type: correlated
    #开启回退模式
    publisher-returns: true

二步:编写returnCallBack()函数
三步:设置exchange处理消息的模式
setMandatory为true,如果消息没有到队列queue,则返回消息给发送方
setMandatory为false,如果消息没有到队列queue,则丢弃消息(默认)

package com.rabbitmq.springboot_mqproducer;

import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class SpringbootMqProducerApplicationTests {

    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() throws InterruptedException {
        //编写confirm回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关的配置信息
             * @param b 消息是否发送成功
             * @param s 消息发送失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm方法被执行了");
                System.out.println(b);
                if(b){
                    System.out.println("消息从producer -> exchange成功");
                    System.out.println("失败原因:" + s);
                }else{
                    //消息发送失败,需要做一些处理
                    System.out.println("消息从producer -> exchange失败");
                    System.out.println("失败原因:" + s);
                }
            }
        });
        //编写return回调函数
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("return回退模式回调函数执行了");
                System.out.println("消息:"+returnedMessage.getMessage());
                System.out.println("exchange:"+returnedMessage.getExchange());
                System.out.println("replyCode:"+returnedMessage.getReplyCode());
                System.out.println("replyText:"+returnedMessage.getReplyText());
                System.out.println("routingKey:"+returnedMessage.getRoutingKey());

            }
        });
        //设置回退模式中,exchange处理消息的方式
        /*
        当将mandatory设置为false(默认值),如果RabbitMQ无法将消息路由,消息将会被静默丢弃,生产者不会收到通知。
        当设置mandatory为true时,意味着消息被视为"mandatory",如果在发布消息时RabbitMQ无法将消息路由到任何队列(例如由于没有匹配的队列与指定的路由键),则代理将通过调用ReturnListener回调的returnedMessage方法将消息返回给生产者(发布者)。生产者可以根据需要适当地处理这个返回的消息,例如记录日志或执行某些恢复操作。
         */
        rabbitTemplate.setMandatory(true);
        //TODO 这里把routingKey写错,是为了让交换机找不到queue,从而触发returnCallBack()函数
        rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"testtttt.hello","测试springboot整合交换机");
        Thread.sleep(200);
    }

}

消息的可靠投递小结:

  • 设置配置publisher-confirm-type: correlated开启确认模式
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true, 则发送成功,如果为false,则发送失败,需要处理。
  • 设置ConnectionFactory的publisher-returns="true"开肩退回模式。
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
  • 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
    使用channel下列方法,完成事务控制:
    txSelect(),用于将当前channel设置成transaction模式
    txCommit(),用于提交事务
    txRollback(),用于回滚事务

2.Consumer Ack

ack指Acknowledge,确认。表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”
  • 根据异常情况确认:acknowledge=“auto”(这种方式很麻烦,不做讲解)

其中自动确认是指,当消息一旦被Consumer接收到, 则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck() 手动签收,如果出现异常,则调用channel.basicNack() 方法,让其自动重新发送消息。

代码编写:
发送消息的生产者端代码不用变,只需要能够发送消息就行
消费者端:
一步:编写yml配置文件

spring:
  rabbitmq:
    username: heima
    password: heima
    virtual-host: itcast
    host: 1.12.244.105
    port: 5673
    #设置消息为手动签收
    listener:
      simple:
        acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理
        concurrency: 1 #当前监听的数量
        max-concurrency: 5 #最大监听数量
        retry:
          enabled: true #是否支持重试
          max-attempts: 4 #最大重试次数,默认为3

二步:编写消费者代码
消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要 @RabbitListener 标记的方法,或者 @RabbitListener 标记的类+ @RabbitHandler 标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)

package com.rabbit.springboot_mqconsumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

/**
 * @author Watching
 * * @date 2023/7/19
 * * Describe:
 */
@Component
public class RabbitMQListener implements ChannelAwareMessageListener {
//    @RabbitListener(queues = {"boot_topic_queue"})//填写队列名称,可以以字符串数组的方式监听多个队列
//    public void listener(Message message){
//        System.out.println("message:"+message);
//    }

    /**
     * 使用ChannelAwareMessageListener监听器接口中的onMessage()方法来充当消费者,如果上面注释的方法与当前方法同时存在,一条消息只会被消费一次。不会被两个方法都消费
     *
     * @param message
     * @param channel
     * @throws Exception Consumer ACK机制:
     *                   1.设置手动签收。acknowledge= "manual”
     *                   2.让监听器类实现ChannelAwareMessageListener接口
     *                   3.如果消息成功处理,则调用channel的basicAck()签收
     *                   4.如果消息处理失败,则调用channel的basicNack( )拒绝签收,broker重新发送给consumer
     */
    @RabbitListener(queues = "boot_topic_queue" )
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            //1.接收消息
            System.out.println("message:" + message);
            System.out.println("channel:" + channel);
            //2.处理业务逻辑
            System.out.println("模拟处理业务逻辑......");
            //3.手动签收
        /*
        void basicAck(long deliveryTag, boolean multiple) throws IOException;
        deliveryTag:
            当消费者接收到一条消息后,RabbitMQ 会为该消息分配一个唯一的 DeliveryTag。这个 DeliveryTag 是一个64位的长整型数值,并且只在该 Channel 内唯一,即相同 Channel 下的 DeliveryTag 不会重复。
        multiple:
            当 multiple 设置为 false 时,表示只确认当前指定的 deliveryTag 对应的一条消息。也就是说,只确认指定的单个消息已经成功被处理或处理失败。
            当 multiple 设置为 true 时,表示确认当前指定的 deliveryTag 及其之前所有未确认的消息(在同一个 Channel 下)。也就是说,会一次性确认多条消息的处理状态,将 deliveryTag 小于或等于指定 deliveryTag 的所有消息都确认处理了。
            这种批量确认的机制有助于提高消息的处理效率,特别是当消费者处理多条消息时,可以通过一次性确认多条消息的方式来减少网络开销和消费者端的负担。
            在使用 channel.basicAck(deliveryTag, multiple) 和 channel.basicNack(deliveryTag, multiple, requeue) 方法时,可以根据实际场景来选择是单条确认还是批量确认,以满足不同的业务需求。
         */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            System.out.println("完成手动签收");
        }catch(Exception e){
            //4.出现异常,拒绝签收
            /*
            deliveryTag:一个唯一标识消息的64位长整型数值,用于确认消息的消费状态。
            multiple:一个布尔类型的参数,用于决定是否批量处理多条消息。若设置为 true,则会否定当前指定 deliveryTag 及其之前的所有未确认消息;若设置为 false,则只否定当前指定 deliveryTag 对应的一条消息。
            requeue:一个布尔类型的参数,表示是否将消息重新放回队列。若设置为 true,则消息会被重新入队列,RabbitMQ 会再次将它发送给消费者;若设置为 false,则消息会被直接丢弃,不会重新放回队列。
             */
            System.out.println("代码逻辑出现异常,拒收");
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
        }
    }
}

只需要两步,就可以实现Consumer ack,下面我们来测试一下:
首先是正常运行的代码的结果:(业务逻辑代码无异常)
在这里插入图片描述
生产者端是用的前面测试boot整合的代码
在这里插入图片描述
然后我们来测试业务逻辑代码出错的情况,我们在业务逻辑代码处添加一个除数不能为0的异常
在这里插入图片描述
再次运行代码,一直在重试,一直再报错
在这里插入图片描述

消息的可靠性总结

1.持久化:

  • exchange要持久化
  • queue要持久化
  • message要持久化

2.生产方确认Confirm(在后续文章中会讲解如何在回调函数中进行具体的处理
3.消费方确认Ack
4. Broker高可用(集群搭建

3.消费端限流

在A系统中,每秒最多只能处理1000条请求,如果在一秒钟只能瞬间有5000条请求打入A系统,那么A系统就会崩溃,所以我们在A系统中加入一个MQ中间件,让5000个请求先发送到MQ,然后A系统再分批次的从MQ中拉取1000条请求,这样A系统就避免了崩溃的情况。
这也是我们常说的MQ的削峰功能
在这里插入图片描述
设置MQ消费限流很简单,只需要设置两个属性:

  • 确认模式设置为手动确认(在上面的Ack我们已经讲过)
  • 设置prefetch属性,prefetch = n,n就是每次从MQ中获取消息的数量
    在这里插入图片描述
    其余的消费端代码和生产者端代码不用修改。
    当设置了消费端限流后,如果从MQ中取出1条消息,消费者端没有进行确认,那么消费者端将不会再从MQ中取消息,直到消息被确认。

4.TTL

TTL全称Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列Queue设置过期时间。
举一个例子:
生活中我们在购买商品的时候会下订单,系统会提示我们要在30分钟之内付款,否则订单将会被取消。
在这里插入图片描述

Ⅰ、先在控制台模拟上面的情况

①创建一个交换机
在这里插入图片描述
②创建一个队列
在这里插入图片描述
③进入交换机exchange_ttl和队列queue_ttl进行绑定
在这里插入图片描述
④消息的发布
在这里插入图片描述
⑤在消息队列中查看
将鼠标放上ttl,就可以看到设置的时间,等时间一过,这条消息就会被自动清除。
在这里插入图片描述

Ⅱ、代码实现队列过期,和消息过期

①创建交换机,队列,以及绑定关系

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Watching
 * * @date 2023/7/18
 * * Describe:
 */
@Configuration
public class MQConfig {

    public static final String QUEUE_TTL_NAME = "queue_ttl";
    public static final String EXCHANGE_TTL_NAME = "exchange_ttl";



    /*
创建队列,测试ttl特性
 */
    @Bean("test_queue_ttl")
    public Queue ttlQueue() {
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl",10000);//消息过期的时间
        arguments.put("x-expires",100000);//队列过期的时间
        //设置队列的ttl时间
        return QueueBuilder.durable(QUEUE_TTL_NAME).withArguments(arguments).build();//参数的属性可以在控制台上查看
    }

    /*
创建一个交换机测试队列ttl特性
 */
    @Bean("test_exchange_ttl")
    public Exchange ttlExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_TTL_NAME).durable(true).build();
    }

    /*
    绑定ttl交换机和队列
     */
    @Bean
    public Binding ttlBinding(@Qualifier("test_exchange_ttl") Exchange exchange, @Qualifier("test_queue_ttl") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
    }
}

在创建队列时,我们指定了x-message-ttl,使队列中的所有消息都是一个固定的时间过期
我们还可以在发送消息时,指定每条消息的过期时间。
只需要在发送方法convertAndSend()方法中添加一个消息后处理参数即可


    /*
    MessagePostProcessor 是 Spring AMQP 中的一个接口,用于对消息进行后处理。
    通过实现该接口,你可以在发送消息之前对消息进行一些自定义处理,例如添加自定义的消息头、修改消息内容等。
     */
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //1.设置消息属性
            message.getMessageProperties().setExpiration("5000");//5000ms过期
            //2.返回该消息
            return message;
        }
    };
    @Test
    void testSend() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TTL_NAME, "ttl.hello", "测试ttl"+i,messagePostProcessor);
        }
        Thread.sleep(200);
    }

小细节:
①当队列设置了x-expires和x-messgae-ttl,消息过期时间以短的为准
②当队列设置了x-messgae-ttl,且发送消息时通过消息后处理也设置了过期时间,那么消息过期时间也以短的为准。
③当十条消息中只有一条消息设置了过期时间,这条消息过期后,只有处于队列顶端,即即将被消费时,才会对这条消息是否过期做判断。

5.死信队列

5.1 概念

死信队列,英文缩写: DLX ,Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。(死信队列为什么英文翻译过来使死信交换机呢?因为交换机概念只有在RabbitMQ中才有,其它MQ中间件只有队列概念,所以习惯叫死信队列,而RabbitMQ中存在交换机概念,所以叫死信交换机。)
在这里插入图片描述
在这里我们需要理解的问题有:
①消息什么时候成为死信?

  • 队列长度达到限制,比如队列最多容纳10条消息,当第11条消息进入时,这条消息就成为了死信消息。
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间却并未被消费

以上三种,满足一条即为死信消息

②队列如何绑定死信交换机?
队列设置参数:x-dead-letter-exchangex-dead-letter-routing-key
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:消息发送时指定的routingKey

在这里插入图片描述

5.2 代码实现死信队列

创建死信队列:

  • 1.声明正常的队列(test_queue_dLx)和交换机(test_exchange_dlx)
  • 2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
  • 3.正常队列绑定死信交换机,正常队列绑定死信队列不需要创建Binding Bean,只需要在正常队列创建时设置参数就可以
    – 设置两个参数:
    x-dead-letter-exchange:死信交换机名称
    x-dead-letter-routing-key:发送给死信交换机的routingkey

设置正常队列中的消息的过期时间x-message-ttl
设置正常队列的长度限制x-max-length

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Watching
 * * @date 2023/7/18
 * * Describe:
 */
@Configuration
public class MQConfig {

    /**
     * 测试死信队列
     */
    /*
    创建普通交换机和普通队列
     */
    @Bean("test_exchange_dlx")
    public Exchange testDlxExchange() {
        return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();
    }

    @Bean("test_queue_dlx")
    public Queue testDlxQueue() {
        Map<String,Object> map = new HashMap<>();
        //x-dead-letter-exchange:死信交换机名称
        map.put("x-dead-letter-exchange","exchange_dlx");
        //x-dead-letter-routing-key:发送给死信交换机的routingkey
        map.put("x-dead-letter-routing-key","dlx.hehe");//这个routingkey只需要满足死信交换机的路由规则就可以
        //设置正常队列中的消息的过期时间ttl
        map.put("x-message-ttl",10000);
        //设置正常队列的长度限制max-length
        map.put("x-max_length",10);
        return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();
    }

    @Bean
    public Binding binding1(@Qualifier("test_exchange_dlx") Exchange exchange,@Qualifier("test_queue_dlx")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }

    /*
    创建死信交换机和死信队列
     */
    @Bean("exchange_dlx")
    public Exchange dlxExchange() {
        return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();
    }

    @Bean("queue_dlx")
    public Queue dlxQueue() {
        return QueueBuilder.durable("queue_dlx").build();
    }
    @Bean
    public Binding binding2(@Qualifier("exchange_dlx") Exchange exchange,@Qualifier("queue_dlx")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }

    /*
    绑定普通队列和死信交换机,并不需要写一个Binding,只需要在普通队列中添加参数就行
     */
}

发送消息测试死信消息:
1.过期时间
2.长度限制
3.消息拒收

    @Test
    void testDlx() {
        //1.过期时间
//        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息超出过期时间变成死信");
        //2.超出队列消息数量限制
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hello", "测试消息超出队列数量限制变成死信");
//        }
        //3.消费端拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息被拒收变成死信");
    }

死信队列小结:
1.死信交换机,死信队列和普通交换机,普通队列没有区别.
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被重新路由到死信队列中
3.消息成为死信的三种情况

  • 消息在队列中到达超时时间并未被消费
  • 消息在消费者端被拒收,且设置了不重回队列
  • 队列长度存在限制,消息数量超出了限制

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
实现方式:
1.定时器
2.延迟队列
订单系统将订单放入延迟队列种,30分钟后取出,去库存系统中判断订单是否已经支付,再进行后续的支付或者未支付操作
在这里插入图片描述
但是!
RabbitMQ官方没有提供延迟队列,所以我们需要使用ttl+死信队列构成延迟队列
普通队列设置为30min中过期,过期后消息路由到死信队列,库存系统从死信队列中取消息,这样就形成了一个延迟队列
在这里插入图片描述

代码实现延迟队列

1.定义正常交换机(order_exchange)和队列(order_queue),同时绑定
2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx),同时绑定
3.绑定正常队列和死信交换机,设置正常队列过期时间为10秒

    /**
     * 测试延迟队列
     */
    /*
    1.定义正常交换机(order_exchange)和队列(order_queue)
     */
    @Bean("orderQueue")
    public Queue orderQueue(){
        //3.正常队列绑定死信交换机
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","order_exchange_dlx");
        map.put("x-dead-letter-routing-key","dlx.order.hehe");
        //设置正常队列的消息过期时间
        map.put("x-message-ttl",10000);
        return QueueBuilder.durable("order_queue").withArguments(map).build();
    }
    @Bean("orderExchange")
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange("order_exchange").build();
    }
    @Bean
    public Binding orderBinding(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

    /*
    2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx)
     */
    @Bean("orderQueueDlx")
    public Queue orderQueueDlx(){
        return QueueBuilder.durable("order_queue_dlx").build();
    }
    @Bean("orderExchangeDlx")
    public Exchange orderExchangeDlx(){
        return ExchangeBuilder.topicExchange("order_exchange_dlx").build();
    }
    @Bean
    public Binding orderBindingDlx(@Qualifier("orderQueueDlx")Queue queue,@Qualifier("orderExchangeDlx")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
    }

4.创建生产者发送消息

    /**
     * 测试延迟队列
     */
    @Test
    void testDelay() throws InterruptedException {
        rabbitTemplate.convertAndSend("order_exchange","order.test","测试延迟队列");
        for (int i = 10;i > 0;i--){
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }

5.创建消费者

package com.rabbit.springboot_mqconsumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * @author Watching
 * * @date 2023/8/2
 * * Describe:
 */
@Component
public class OrderListener implements ChannelAwareMessageListener {

    @RabbitListener(queues = "order_queue_dlx")//监听死信队列
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            //1.接收message
            System.out.println("message:"+message);
            //2.处理业务逻辑
            System.out.println("处理业务逻辑");
            System.out.println("根据订单id在数据库中查询订单状态");
            System.out.println("判断订单是否支付成功");
            System.out.println("未支付,回滚库存,取消订单");
            //3.手动签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }catch (Exception e){
            //4.业务出错,拒绝签收
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);//业务出错,拒签后要将这条消息重新放回死信队列
        }
    }
}

延迟队列小结:
1.延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用: TTL + DLX来实现延迟队列效果。

应用问题

1.消息补偿

消息补偿机制

2.幂等性保障

幂等性保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/829900.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql-入门笔记-3

# ----------排序查询-------- # 语法 # select 字段列表 from 表名 order by 字段1 排序方式1 ,字段2 排序方式2 ; DESC 降序 ASC升序 # 1 根据年龄对公司的员工进行升序排序---默认升序-黄色提示代码冗余 select * from userTable order by age ASC ; # 2 根据入职时间,对员…

LPython:最新的高性能Python实现、速度极快且支持多后端

LPython 是最新的开源 Python 实现&#xff0c;目标是打造高性能版本的 Python。 LPython 官网写道&#xff0c;它一直作为 Python 编译器在开发&#xff0c;能够生成优化的机器代码。LPython 的后端支持 LLVM、C/C 翻译&#xff0c;甚至还支持 WebAssembly (WASM)。 LPython 主…

CGAL-几何对象基础判断-点线段使用-hello world

文章目录 1.概述2.点和线段3.点序列的凸包3.1.内置数组中点的凸包3.2.向量中点的凸包 4.关于kernel和Traits类5.概念和模型 1.概述 本教程是为CGAL新手&#xff0c;大概知道c和几何算法的基本知识。第一部分展示了如何定义点和段类&#xff0c;以及如何在它们上应用几何谓词。…

环球数科、BUFFALO面试(部分)

环球数科 系统复杂且需求迭代频繁&#xff0c;如何维护微服务之间的接口调用关系&#xff1f; API接口在设计的时候需要大量的需求文档&#xff0c;而且文档也需要不断维护。如何高效维护API文档就很重要了。以下是一些常见的API管理工具&#xff1a;Swagger&#xff1a;Swag…

云主机OOM宕机原因分级及处理

一、故障现象 某次服务器告警宕机故障&#xff0c;无法ssh连入&#xff0c;控制台登录后查看&#xff0c;发生OOM事件&#xff0c;OOM就是我们常说的Out of Memory内存溢出&#xff0c;它是指需要的内存空间大于系统分配的内存空间&#xff0c;导致项目程序crash&#xff0c;甚…

【MySQL】使用C/C++连接MySQL数据库

【MySQL】使用C/C连接MySQL数据库 验证使用select特殊点 本文目的&#xff1a;使用MySQL提供的CAPI完成对数据库的操作 验证 #include <iostream> #include <mysql/mysql.h>int main() {std::cout<<"mysql cilent version: "<<mysql_get_cl…

面试热题100(二叉树的右视图)

给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 树这类问题用的最多的就是递归&#xff0c;因为树具有天然的递归结构&#xff1a; 我们来分析一下题目&#xff0c;给定一棵树根结…

回归预测 | MATLAB实现SO-CNN-GRU蛇群算法优化卷积门控循环单元多输入单输出回归预测

回归预测 | MATLAB实现SO-CNN-GRU蛇群算法优化卷积门控循环单元多输入单输出回归预测 目录 回归预测 | MATLAB实现SO-CNN-GRU蛇群算法优化卷积门控循环单元多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现SO-CNN-GRU蛇群算法优化卷积门控循…

nvm安装和使用

公司不同系统用的node版本不一样&#xff0c;所以就需要安装多版本了&#xff0c;那么使用nvm来管理就大大方便了开发。 使用nvm有哪些好处呢 安装node很方便&#xff0c;只需要一条命令可以轻松切换node版本可以多版本node并存 需要注意的是安装之前先把原有的node给卸载掉…

24考研数据结构-线索二叉树的线索化

目录 数据结构&#xff1a;线索二叉树与线索化线索二叉树的定义线索化过程线索化的应用总结 5.3.2线索二叉树1. 线索二叉树的概念与作用2.线索二叉树的存储结构3. 二叉树的线索化1. 中序线索化2. 先序线索化3. 后序线索化 4. 线索树的寻找前驱后继的各种情况&#xff08;多理解…

向“数”而“深”,联想凌拓的“破局求变”底气何来?

前言&#xff1a;要赢得更多机遇&#xff0c;“破局求变”尤为重要。 【全球存储观察 &#xff5c; 热点关注】2019年2月25日&#xff0c;承袭联想集团与NetApp的“双基因”&#xff0c;联想凌拓正式成立。历经四年多的发展&#xff0c;联想凌拓已成为中国企业级数据管理领域的…

opencv-29 Otsu 处理(图像分割)

Otsu 处理 Otsu 处理是一种用于图像分割的方法&#xff0c;旨在自动找到一个阈值&#xff0c;将图像分成两个类别&#xff1a;前景和背景。这种方法最初由日本学者大津展之&#xff08;Nobuyuki Otsu&#xff09;在 1979 年提出 在 Otsu 处理中&#xff0c;我们通过最小化类别内…

C语言-------函数栈帧的创建和销毁------剖析描骨

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; &#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382;…

Talk | 新加坡国立大学博士生施宇钧:DragDiffusion-基于扩散模型的关键点拖拽图片编辑

本期为TechBeat人工智能社区第518期线上Talk&#xff01; 北京时间8月2日(周三)20:00&#xff0c; 新加坡国立大学博士生—施宇钧的Talk已准时在TechBeat人工智能社区开播&#xff01; 他与大家分享的主题是: “DragDiffusion-基于扩散模型的关键点拖拽图片编辑”&#xff0c;他…

浅谈机器视觉

目录 1.什么是机器视觉 2.学习机器视觉需要掌握的知识 3.机器视觉的由来 4.机器视觉带来的福利 1.什么是机器视觉 机器视觉&#xff08;Computer Vision&#xff09;是人工智能领域中的一个分支&#xff0c;旨在通过模仿人类的视觉系统&#xff0c;使计算机能够理解和解释图…

使用uni-app的uniCloud 云数据库入门:实现一个简单的增删改查

官方云数据库文档 前置步骤使用uni-app新建一个uniCloud项目 [外链图片转存失败,源站可能有防盗官方云数据库文档]!链机制,建议将()https://uniapp.dcloud.net.cn/uniCloud/hellodb.html)] 新建表 这里我加了几个测试字段 createTime、remark、money // 文档教程: https://un…

深度强化实车部署教程

强化學習仿真實車部署 前言 这里讲一下如何部署 有两种方式部署&#xff1a; 第一种实车远程控制&#xff1a;即通过roscore中的IP设置实现远程控制&#xff1b;具体可以参考turtlebot3的PC连接turtlebot3并控制的教程&#xff1b;我使用的是这种方法&#xff1b; 第二种直…

一条命令重启supervisor所有RUNNING状态的进程

supervisorctl status | grep RUNNING | awk {print $1} | xargs -n1 supervisorctl restart

选择适合的项目管理系统,了解有哪些选择和推荐

随着科技的进步和全球竞争的加剧&#xff0c;项目管理已经成为企业成功的关键要素。为了更好地组织和监控项目&#xff0c;许多企业和组织正在采用项目管理系统(PMS)。本文将探讨项目管理系统的主要组成部分以及其在实际应用中的优势。 “项目管理系统有哪些?国际上比较常见的…

GCC版本升高到11.3后编译之前同样的C++代码出现的若干错误

目录 1 gtest-death-test.cc:1301:24: error: ‘dummy’ may be used uninitialized 2 error: ‘void* memcpy(void*, const void*, size_t)’ copying an object of non-trivial type ‘Eigen::internal::Packet4c’ 3 error: comparison is always true due to limited ra…