【SpringBoot整合RabbitMQ(下)】

news2025/1/12 20:49:57

八、死信队列

        先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了, consumer queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。

8.1、死信的三大来源

代码结构图:

 架构图逻辑说明:

生产者发送消息至交换机(正常交换机),由交换机根据routing-key决定发送到哪个队列(正常队列),此时触发以下三种条件之一,正常队列将消息发送到死信交换机,并指明routing-key发往死信队列,再由死信队列消费者进行消费。

8.1.1、消息 TTL 过期

生产者:

    private static final String EXCHANGE_NAME = "normal_exchange";
    /**
     * 设置过期时间导致消息死信
     * @param args
     * @throws Exception
     */
    public static void main1(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息 设置TTL时间 -》time to live 单位是ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //发消息
        for (int i = 0; i < 10; i++) {
            String message = "info" + i;
            channel.basicPublish(EXCHANGE_NAME,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
        }
    }

正常消费者:

    /**
     * 正常交换机名称
     */
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    /**
     * 死信交换机名称
     */
    private static final String DEAD_EXCHANGE = "dead_exchange";
    /**
     * 交换机类型
     */
    private static final String TYPE = "direct";
    /**
     * 普通队列名称
     */
    private static final String NORMAL_QUEUE = "c1";
    /**
     * 死信队列名称
     */
    private static final String DEAD_QUEUE = "c2";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,TYPE);
        channel.exchangeDeclare(DEAD_EXCHANGE,TYPE);
        //声明普通队列
        Map<String,Object> argument = new HashMap<>();
        //正常的队列设置消息变成死信之后发送给哪个死信交换机
        argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信rountingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
        argument.put("x-dead-letter-routing-key","lisi");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定普通交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信交换机与队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("C1等待接收消息......");
        //声明 接收消息
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            String msg = new String(message.getBody(),"UTF-8");
             System.out.println(msg);
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
    }

 生产者和消费者线程启动后将消费者线程杀死,即消息无法被消费,等待消息过期后查看死信队列当中的消息数量或死信消费者的控制台输出。

注意:消息过期时间也可以由队列来决定,argument集合中添加如下参数即可:

        //过期时间-》10s
        argument.put("x-message-ttl",10_000);

8.1.2、队列达到最大长度(队列满了,无法再添加数据到 mq )

生产者:

public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        for (int i = 0; i < 10; i++) {
            String message = "info" + i;
            channel.basicPublish(EXCHANGE_NAME,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }

消费者:

声明队列时添加参数(其余代码与情况1代码一致):

        //设置正常队列的最大容量
        argument.put("x-max-length",6);

先启动消费者线程,由消费者线程声明完交换机和队列后将消费者线程杀死,此时再启动生产者线程发送消息,此处发送十条消息,由于队列最多存储6条消息,其余4条则发往死信队列,生产者程序执行完成后查看WEB端两个队列当中的消息条数即可。

8.1.3、消息被拒绝(basic.reject basic.nack)并且 requeue=false.

消费者(整体代码与情况1一致,区别在于接收到消息时的回调函数):

        DeliverCallback deliverCallback = (consumeTag, message) ->{
            String msg = new String(message.getBody(),"UTF-8");
            if (msg.equals("info5")){
                //拒绝该消息接收
                System.out.println("拒绝该消息:" + msg);
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                System.out.println("Consumer1接收的消息是:" + new String(message.getBody(),"UTF-8"));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };

 生产者和消费者线程一并启动,生产者代码与情况2当中的代码一致,当发送消息为:“info5”时正常消费者拒绝该消息,此时正常队列将该消息发送到死信队列进行处理。

8.1.4、死信消费者

    private static final String DEAD_QUEUE = "c2";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明 接收消息
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println("Consumer2接收的消息是:" + new String(message.getBody(),"UTF-8"));
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }

九、整合SpringBoot

添加依赖:

<dependencies>
 <!--RabbitMQ 依赖-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!--swagger-->
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
 </dependency>
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!--RabbitMQ 测试依赖-->
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

配置文件:

spring:
  rabbitmq:
    host: 43.138.78.150
    port: 5672
    username: admin
    password: 123
    virtual-host: /test
    publisher-confirm-type: CORRELATED
    publisher-returns: true

十、延迟队列

延迟队列是死信队列的第一种情况(消息TTL过期)进一步演化而来的,原因如下:

在我们上述死信队列代码演示当中说明了两种设置消息过期的方法,一种是发送消息时指明消息多久后过期,另外一种是设置某队列当中消息的过期时间,这两种方式是有差异的,如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

10.1、简单延迟队列实现

实现逻辑:和上述的死信队列实现方式类似,区别在于我们不再需要正常消费者,只需要设置死信消费者即可。

代码结构图:创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

生产者:

由于整合了SpringBoot,我们将不再使用main函数的形式发送消息,而是采用web界面的输入来发送消息。
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    //发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:"+message);
    }
}

队列声明、交换机声明、绑定关系:

@Configuration
public class QueueTtlConfig {
    //普通交换机的名称
    private static final String NORMAL_EXCHANGE = "X";
    //死信交换机的名称
    private static final String DEAD_EXCHANGE = "Y";
    //普通队列1的名称
    private static final String NORMAL_QUEUE_1 = "QA";
    //普通队列2的名称
    private static final String NORMAL_QUEUE_2 = "QB";
    //死信队列的名称
    private static final String DEAD_QUEUE = "QD";


    //声明X 交换机
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    //声明Y 交换机
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //声明普通队列A TTL为10s
    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> arguments = new HashMap<>(3);
        //过期时间-》10s
        arguments.put("x-message-ttl",10_000);
        //正常的队列设置消息变成死信之后发送给哪个死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(NORMAL_QUEUE_1).withArguments(arguments).build();
    }

    //声明普通队列B TTL为40s
    @Bean("queueB")
    public Queue queueB(){
        Map<String,Object> arguments = new HashMap<>(3);
        //过期时间-》10s
        arguments.put("x-message-ttl",40_000);
        //正常的队列设置消息变成死信之后发送给哪个死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(NORMAL_QUEUE_2).withArguments(arguments).build();
    }


    //声明死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }
    //队列QA和交换机X绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("XA");
    }
    //队列QB和交换机X绑定
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("XB");
    }
    //队列QD和交换机Y绑定
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("YD");
    }
}

 消费者:

@Component
@Slf4j
public class Consumer {
    //接收消息
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMsg(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm_queue消息:{}",msg);
    }
}

 执行测试:

浏览器输入:http://localhost:8080/ttl/sendMsg/你好

 可以看到死信消费者消费了两个队列发送给死信队列的消息。

10.2、延迟队列优化

在上面实现的延迟队列当中,我们的发送消息的延迟时间是固定的,只有10s和40s两种,但是在实际应用场景当中,我们的延迟时间是不固定的,换句话说,延迟时间是由用户的需求所决定的。因此,我们需要再生产者这边就决定消息的延迟时间。

代码架构图:在上面实现的架构当中添加队列QC,绑定关系如下,不设置其TTL时间。

生产者:

    //发消息-自定义TTL
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendExpirationMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条过期时长为{}毫秒的消息给随机TTL队列:{}",new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("X","XC","消息来自TTL为自定义的队列:"+message,msg ->{
            //发送消息的时候 延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

消费者:消费者没有变化

队列声明、交换机声明、绑定关系:在上面的声明配置类当中添加如下代码

    //普通队列3的名称
    private static final String NORMAL_QUEUE_3 = "QC";
    //声明普通队列C
    @Bean("queueC")
    public Queue queueC(){
        Map<String,Object> arguments = new HashMap<>(3);
        //正常的队列设置消息变成死信之后发送给哪个死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(NORMAL_QUEUE_3).withArguments(arguments).build();
    }
    //队列QC和交换机X绑定
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queue,@Qualifier("xExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("XC");
    }

 测试:

浏览器输入:

http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000
控制台输出:

 观察控制台我们发现就有问题了,20s时我们发送“你好1”,延迟时间20s,41s时死信队列收到该消息,有1s的误差是可以接受的,因为网络传输也需要时间,但是,我们30s时发送“你好2”,延迟时间2s,按道理来说是需要在32s左右时收到该消息,可以看到,收到该消息的时间与消息1一致,这就是我们上面所说的在发送消息时设置TTL属性的弊端。究其原因:因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

10.3、解决延迟队列优化后的弊端

10.3.1、安装延迟队列插件

在官网上下载 https://www.rabbitmq.com/community-plugins.html
下载 rabbitmq_delayed_message_exchange 插件,
然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

10.3.2、安装完成后

10.3.3、代码架构图

在这里新增了一个队列 delayed.queue, 一个自定义交换机 delayed.exchange ,绑定关系如下 :

10.3.4、代码实现

        在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia( 一个分布式数据系统 ) 表中,当达到投递时间时,才投递到目标队列中。

声明:

@Configuration
public class DelayedQueueConfig {
    //交换机
    private static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
    //队列
    private static final String DELAYED_QUEUE_NAME = "delayed_queue";
    //routing-key
    private static final String DELAYED_ROUTING_KEY = "delayed_routingKey";

    //声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //声明交换机
    @Bean
    public CustomExchange delayedExchange(){
        Map<String,Object>  arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        /**
         * 1.交换机的名称
         * 2.交换机的类型
         * 3.是否需要持久化
         * 4.是否需要自动删除
         * 5.其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
    }


    //队列和交换机绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange customExchange){
        return BindingBuilder.bind(queue).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者:

    //发消息-基于插件 发送消息+时间
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayTime){
        log.info("当前时间:{},发送一条过期时长为{}毫秒的消息给随机TTL队列:{}",new Date(),delayTime,message);
        rabbitTemplate.convertAndSend("delayed_exchange","delayed_routingKey","消息来自TTL为自定义的队列:"+message,msg ->{
            //发送消息的时候 延迟时长
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

消费者:

@Slf4j
@Component
public class DelayQueueConsumer {
    //接收消息
    @RabbitListener(queues = "delayed_queue")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody(),"UTF-8");
        log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);
    }
}

 10.3.5、测试验证

浏览器输入:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
控制台效果:

十一、发布确认高级模式

在上一篇中我们将了发布确认模式的必要性,我们此时考虑一个复杂的场景,例如,生产者发送消息时,MQ此时刚好宕机重启了,甚至整个MQ集群都不可用了,那重启期间生产者发送的消息就会丢失,需要我们手动处理和恢复。此时我们思考一个问题,如何保证消息可靠投递呢?换句话说,极端情况下,无法投递的消息我们该如何处理?

11.1、基础逻辑代码

配置类:

@Configuration
public class ConfirmConfig {
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //routingKey
    public static final String ROUTING_KEY = "key1";
    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
       return new DirectExchange(CONFIRM_EXCHANGE_NAME);

    }
    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,
                                        @Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
    }
}

生产者:

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8));
        log.info("发送消息内容:{}",message);

}

消费者:

@Component
@Slf4j
public class Consumer {
    //接收消息
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMsg(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm_queue消息:{}",msg);
    }
}

11.2、消息发送失败以及消息回退处理(交换机未收到、队列未收到) 

配置文件添加如下配置:

    publisher-confirm-type: CORRELATED//发布消息成功到交换器后会触发回调方法

11.2.1、消息未被交换机或队列收到时的回调函数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的此时就需要添加如下配置:

    publisher-returns: true//消息回退
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    //交换机确认回调方法
    /**
     * 1.发消息 交换机收到了-》回调
     *  1.1 correlationData 保存了回调消息的ID及相关信息
     *  1.2 交换机是否收到消息 true
     *  1.3 交换机没有收到消息的原因-》null
     * 2.发消息 交换机接收失败了-》回调
     *  2.1 correlationData 保存了回调消息的ID及相关信息
     *  2.2 交换机是否收到消息 false
     *  2.3 交换机没有收到消息的原因-》reason
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "0";
        if (b){
            log.info("交换机已经收到了ID为:{}的消息",id);
        }else {
            log.info("交换机没有收到ID为:{}的消息,原因为:{}",id,s);
        }
    }


    //当生产者成功发送消息至交换机后但未发送至队列时的回调函数
    //当消息不可达队列时才会触发此函数
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息{}被交换机{}退回,退回的原因:{},路由key:{}",
                returnedMessage.getMessage().getBody().toString(),
                returnedMessage.getExchange(),
                returnedMessage.getReplyText(),
                returnedMessage.getRoutingKey());
    }

}

 验证:

①、交换机错误

 将发送消息时的交换机名称进行修改,即可观察到所触发的回调函数。

 rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + "123",ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8),correlationData);

②、路由错误

  生产者代码修改:

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        String message1 = message + "key1";
        CorrelationData correlationData = new CorrelationData();
        String id = String.valueOf(UUID.randomUUID());
        correlationData.setId(id);
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8),correlationData);
        log.info("发送消息1内容:{}",message + "key1");

        CorrelationData correlationData2 = new CorrelationData();
        String id2 = String.valueOf(UUID.randomUUID());
        correlationData2.setId(id2);
        String message2 = message + "key12";
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY + "2",message2.getBytes(StandardCharsets.UTF_8),correlationData2);
        log.info("发送消息2内容:{}",message + "key12");
    }

十二、备份交换机 

        有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

 12.1、代码结构图及其逻辑

1.Web端输入消息内容

2.控制器将消息封装成两份,message1和message2,message1使用正确的routing-key路由到队列当中,message2使用错误的routing-key路由不到队列中。

3.message1由正常消费者进行消费,message2回退至交换机,由交换机发往备份交换机进行处理,最后由报警队列的消费者进行消费。

12.2、代码

1.生产者:生产者代码使用发布确认高级模式当中的生产者代码。

2.普通交换机及其队列声明:

声明同样采用发布确认高级模式当中的代码,只需要将交换机声明的代码修改为如下代码:

    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange",MsgFailSendConfig.BACKUP_EXCHANGE_NAME);
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArguments(arguments).build();

    }

 使用如上配置后,消息未路由到队列时,将消息发送至备份交换机。 

3.备份交换机及其队列声明:

@Configuration
public class MsgFailSendConfig {
    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //报警队列(或者二次处理队列)
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    //声明备份交换机
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    //声明备份队列和报警队列
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding backupBindingExchange(@Qualifier("backupQueue") Queue queue,
                                         @Qualifier("backupExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean
    public Binding warningBindingExchange(@Qualifier("warningQueue") Queue queue,
                                          @Qualifier("backupExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

}

12.3、效果验证

 浏览器输入:

http://localhost:8080/confirm/sendMsg/你好

控制台效果:

十三、优先级队列

13.1、使用场景

在我们系统中有一个 订单催付 的场景,我们的客户在天猫下的订单 , 淘宝会及时将订单推送给我们,如 果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景, 所以订单量大了后采用 RabbitMQ 进行改造和优化 , 如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

13.2、如何添加

a.控制台页面添加

b.代码层面添加

        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-max-priority",10);//设置最大优先级 取值范围0-255 优先值越大,优先级越高
        /**
         * 创建一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
         * 3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);

13.3、实战

生产者:

public class Producer {
    //队列名称
    private static final String QUEUE_NAME = "hello";

    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-max-priority",10);//设置最大优先级 取值范围0-255 优先值越大,优先级越高
        /**
         * 创建一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
         * 3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);
        for (int i = 11; i < 21; i++) {
            String message = "info:" + i;
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(i-10).build();//设置消息优先级
            /**
             * 发送一个消息
             * 1.发送到哪个交换机
             * 2.路由的key值是哪个 本次是队列名称
             * 3.其他参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,properties,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息体:" + message + " 优先级:" + (i - 10));
        }
    }
}

消费者:

public class Consumer {
    //队列的名称
    public static final String QUEUE_NAME = "hello";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 声明 接收消息
         */
        DeliverCallback deliverCallback = (consumeTag, message) ->{
            System.out.println(new String(message.getBody()));
        };
        //取消消费的回调
        CancelCallback cancelCallback = (consumeTag) -> {
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
         * 3.当一个消息发送过来后的回调接口
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

 结果验证:

 

 

十四、惰性队列

14.1、使用场景

        RabbitMQ 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因( 比如消费者下线、宕机亦或者是由于维护而关闭等 ) 而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

14.2、两种模式

队列具备两种模式: default lazy
代码设置:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/533635.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用Pin自动对二进制文件脱壳

Intel Pin Intel Pin在可执行二进制代码中插入一些探测函数,用于观察、记录、分析宿主代码执行过程中的一些与计算机体系结构相关的特性,如访存指令,寄存器内容,寄存器地址等,通过Pin提供的API可以编写各种分析函数,这样程序运行完以后,统计和分析结果也同时产生,分析…

I3D--视频理解必读论文总结

论文标题&#xff1a;Quo Vadis, Action Recognition? A New Model and the Kinetics会议期刊&#xff1a; CVPR 2017Dataset 论文地址&#xff1a;https://arxiv.org/pdf/1705.07750.pdf 文章目录 前言文章核心摘要引入方法a. 2DConvLSTMb. 3DConvc d. Two-StrwamTwo-Stream …

C语言学习分享(第七次)------操作符

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C语言学习分享⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多C语言知识   &#x1f51d;&#x1f51d; 操作符详解 1. 前言&#x1f6a9;2…

Ajax,前后端分离开发,前端工程化,Element,Vue路由,打包部署

Ajax介绍 Axios <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…

为什么我掌握了大量软测知识,却还是找不到工作?

很多朋友都在疑惑&#xff0c;为什么随着对于软件测试了解的加深&#xff0c;不断掌握更多测试知识与技巧&#xff0c;找工作貌似越来越难了&#xff1f; 不免让人联想到最近偶然间看到一句话&#xff1a;“软件测试是整个 IT 行业中最差的岗位”。 打工人的问题出在哪&#xf…

使用Jmeter应该如何进行http接口性能测试?

在进行网页或应用程序后台接口开发时&#xff0c;一般要及时测试开发的接口能否正确接收和返回数据&#xff0c;对于单次测试&#xff0c;Postman插件是个不错的Http请求模拟工具。 但是Postman只能模拟单客户端的单次请求&#xff0c;而对于模拟多用户并发等性能测试&#xff…

11.1网络编程——

多线程 一、基础知识概念相关API二、任务创建一个简单的本地客户端创建一个简单的本地服务器三、总结四、问题一、基础知识 概念 网络编程中客户端和服务器指的是进程,而不是常提到的机器或者主机。注意三个概念:请求、响应、事务。 网络编程中客户端-服务器事务是指客户端和…

刷题day65:分割等和子集

题意描述&#xff1a; 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 思路&#xff1a; 使用01背包&#xff0c; 背包的体积为sum / 2背包要放入的商品&#xff08;集合里的元素&#xff09;…

linux数据校验

文件 一般对于文件的校验使用md5&#xff0c;centos7系统有自带的md5校验工具md5sum&#xff0c;可以用来校验两个文件是否一致 可以对比一下md5值是否一致来校验文件是否一致 目录 1 若是在主机上使用网络磁盘挂载备份的可以使用diff工具对比两个目录是否一致 diff -r /op…

Nginx使用教程

目录 一、Nginx介绍二、下载和安装三、Nginx命令1.查看版本2.检查配置文件正确性3.启动和停止4.重新加载配置文件 四、配置文件结构五、Nginx具体应用1.部署静态资源2.反向代理3.负载均衡 一、Nginx介绍 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件( IMAP/POP3)代…

少儿编程 中国电子学会图形化编程等级考试Scratch编程二级真题解析(选择题)2023年3月

2023年3月scratch编程等级考试二级真题 选择题(共25题,每题2分,共50分) 1、小猫的程序如图所示,积木块的颜色与球的颜色一致。点击绿旗执行程序后,下列说法正确的是 A、小猫一直在左右移动,嘴里一直说着“抓到了”。 B、小猫会碰到球,然后停止。 C、小猫一直在左右…

《LKD3粗读笔记》(13)虚拟文件系统

《LKD3粗读笔记》(13)虚拟文件系统 虚拟文件系统&#xff0c;简称VFS&#xff0c;是内核的子系统&#xff0c;为用户空间程序提供了文件系统相关的接口。系统中所有文件系统不但依赖VFS共存&#xff0c;而且也依靠VFS系统协同工作。通过虚拟文件系统&#xff0c;程序可以利用标…

文本三剑客正则表达式2

文章目录 文本三剑客&正则表达式21 sed2 sed命令的常用选项3 sed命令的操作符4 打印4.1 按照行号寻址打印4.1.1 只打印第二行4.1.2 只显示行号4.1.3 显示行号及内容4.1.4 只打印最后一行 4.2 进行行号范围区间的打印4.2.1 打印1-3行4.2.2 打印第二行到最后一行4.2.3 打印2-…

操作符续(整型提升与算术转换)

&#x1f929;本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 &#x1f970;内容专栏&#xff1a;这里是《C知识系统分享》专栏&#xff0c;笔者用重金(时间和精力)打造&#xff0c;基础知识一网打尽&#xff0c…

MySQL深入浅出: order by if()与order by in()之条件排序

目录 1&#xff1a;原表数据 2&#xff1a;order by if&#xff08;&#xff09; 3&#xff1a;order by in&#xff08;&#xff09; 4&#xff1a;社区地址 1&#xff1a;原表数据 2&#xff1a;order by if&#xff08;&#xff09; SELECT * FROM people ORDER BY IF(…

mysql数据库的表的增删查改

目录 表的增删查改 6.1&#xff1a;增加 6.2&#xff1a;查找 6.3&#xff1a;更新 6.4&#xff1a;删除 6.5&#xff1a; 插入查询结果 6.6&#xff1a;聚合函数 6.7&#xff1a;group by分组 关键字的先后顺序&#xff1a;from > on> join > where > gro…

C语言实现三子棋小游戏

目录 游戏介绍 游戏菜单的创建&#xff08;menu&#xff09; 游戏核心功能实现 棋盘的初始化&#xff08;InitBoard&#xff09; 棋盘的展现&#xff08;printfboard&#xff09; 玩家下棋&#xff08;playerBoard&#xff09; 电脑下棋&#xff08;computerBoard&#…

探究C++构造函数及其优化

目录 一、 类的六个默认成员函数1.1 框架图1.2 具体介绍&#xff08;1&#xff09;构造函数&#xff08;2&#xff09;析构函数&#xff08;3&#xff09;拷贝构造函数&#xff08;4&#xff09;赋值运算符重载函数 归纳我们不写&#xff0c;编译器默认生成了什么&#xff1a; …

2023宁波市赛 天一永安杯赛前模拟题部分wp

Web pop 进hint.php 伪协议读index.php <?php class Tiger{public $string;protected $var;// 恶意参数public function __construct($var){$this->var $var;}public function __toString(){return $this->string;}public function boss($value){// 0eval($valu…

自动化测试作为软件测试的一种技术手段,时常被大家讨论

自动化测试作为软件测试的一种技术手段&#xff0c;时常被大家讨论。本人在自动化技术方面有过略有小成&#xff0c;今天聊一聊关于自动化的一些误区&#xff0c;以帮助新手能正确的了解和认识自动化一些概念。 测试的行为本质是什么&#xff1f; 为什么先从这个概念开始谈起&…