RabbitMQ(15672) 消息中间件 NOTE

news2024/11/15 21:32:49

目录

1、初识 RabbitMQ 消息队列

1.1 MQ 四大核心概念

1.2 消息的发送(无交换机态)

1.3 关于消息自动重新入队

1.3.1 消息的常见应答方法(R)

1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值

2、RabbitMQ 消息的发布确认

2.1 MQ的单个确认发布

2.2 MQ的批量确认发布

2.3 MQ的异步确认发布(重点)

3、关于 Exchanges 交换机

4、死信队列(重点)

5、延迟队列(整合SpringBoot)

6、备份交换机(重点)


 什么是 RabbitMQ ?

        RabbitMQ 是流行的消息队列服务软件,是开源的AMQP(高级消息队列协议)实现;支持多种客户端,如:Java、Python、C、PHP、Ruby、JavaScript等,用于在分布式系统中存储转发消息,可以实现异步处理、流量削峰、系统解耦,在易用性、扩展性、高可用等方面表现优异

1、初识 RabbitMQ 消息队列

1.1 MQ 四大核心概念

生产者:

产生数据发送消息的程序

交换机:

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中;交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列:

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中;队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区;许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据;这就是我们使用队列的方式

消费者:

消费与接收具有相似的含义;消费者大多时候是一个等待接收消息的程序;生产者,消费者和消息中间件很多时候并不在同一机器上;同一个应用程序既可以是生产者又是可以是消费者

以下是 RabbitMQ 的原理图:

1.2 消息的发送(无交换机态)

这里使用MQ中间件进行简单的消息发送,大致流程图如下所示:

这里需要注意的是,当一次性有多条消息发送到队列时,这时需要多个消费者(工作线程),消费者进行消费信息是根据轮询的方式进行消费

创建一个Utils工具类,与 MQ 进行交互连接:

/**
 * 这里是与 MQ 交互的工具类
 */
public class RabbitMQUtils {

    public static Channel getChannel() throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();    //创建连接工厂

        factory.setHost("192.168.101.65");
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();    //创建连接

        return connection.createChannel();  //获取连接信道
    }
}

【消息生产者】代码如下所示:

/**
 * 生产者
 */
public class Produce {

    public static final String QUEUE_NAME = "hello";    //队列名称

    public static void main(String[] args) throws IOException, TimeoutException {

        //这里创建一个工厂,与 RabbitMQ 进行交互
        Channel channel01 = RabbitMQUtils.getChannel();

        //1.队列名称  2.队列是否持久化  3.消息是否供多个消费者消费  4.消息是否自动删除  5.其他参数
        channel01.queueDeclare(QUEUE_NAME,false,false,false,null);


        String message = "hello mq";    //发消息

        //1.对应的交换机  2.路由的KEY值(本次是队列名)   3.其他参数  4.发送消息的消息体
        channel01.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("消息发送完毕!");

    }
}

消息栏:

RabbitMQ 中(以上创建的 hello 队列):

【消息消费者】代码如下所示:

/**
 * 消费者
 */
public class Consumer {

    public static final String QUEUE_NAME = "hello";    //要进行消费消息的队列

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建连接工厂,与MQ进行交互
        Channel channel = RabbitMQUtils.getChannel();

        //接收消息的回调
        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("成功接收消息:"+new String(message.getBody()));    //接收其消息的消息体才能显示对应的消息
        };
        
        //取消消息时的回调
        CancelCallback cancelCallback = (consumerTag) ->{

            System.out.println(consumerTag + "消费者的消息被中断!");
        };
        
        /**
         * 1.要被消费信息的队列
         * 2.消费成功之后是否需要自动应答
         * 3.消费成功时的回调
         * 4.取消消息发送时的回调
         */
        //消费者消费信息
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

消息栏:

MQ 中的消息已经被消费:


1.3 关于消息自动重新入队

        如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队;如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者;这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息

1.3.1 消息的常见应答方法(R)

  • Channel.basicAck (用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

  • Channel.basicNack (用于否定确认)

  • Channel.basicReject (用于否定确认) 与 Channel.basicNack 相比少一个参数

        不处理该消息了直接拒绝,可以将其丢弃了

丢失的消息重新入队,传递给正常工作的消费者进行消费的大致图:

由于生产者的代码没有改变,这里就不写了,以下是消费者(两个消费者只有 sleep 的时间不一样)关于 ACK 手动应答消息的代码:

/**
 * 这里是消费者手动接受消息 ACK,使发送失败的消息重新排队
 */
public class Consumer01 {

    public static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        SleepUtils.sleep(8);   //模拟消息多的情况

        //1、接收到消息的回调
        DeliverCallback deliverCallback = (consumerTag,message) ->{

            System.out.println("接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));

            /**
             * 手动应答
             * 1. 消息的标记
             * 2. 是否批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        //2.消息中断的回调
        CancelCallback  cancelCallback = (consumerTag) -> {

            System.out.println(consumerTag + "消费者取消了消息的接收!");
        };

        //3.使用手动应答
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,deliverCallback,cancelCallback);
    }
}

首先创建两个消费者,分别为C1和C2,这里生产者连续发送四条消息: 

消费者一 处于正常状态消费者二 接收了一条消息后就宕机了,这时,消费者一 将发送失败的消息从信道中取出并进行消费,结果图如下所示:

消费者一:         消费者二:

可见, 就算消费者二突然宕机,RabbitMQ 依然采用轮询方式将发送失败的消息轮询给正常工作的消费者

1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值

队列的持久化:

平时消息队列都是保存在内存中,若 RabbitMQ服务 突然停止,则之前的队列都会消失;所以,为了减少损失的可能性,通常将消息队列保存到磁盘上,即持久化

boolean durable =true;  //将队列进行持久化
        //1.队列名称  2.队列是否持久化  3.消息是否供多个消费者消费  4.消息是否自动删除  5.其他参数
        channel01.queueDeclare(QUEUE_NAME,durable,false,false,null);


 

消息的持久化:

将 MessageProperties.PERSISTENT_TEXT_PLAIN 标识放入 basicPublish消息发送方法的第三个参数中,以开启消息持久化

将消息标记为持久化并不能完全保证不会丢失消息;尽管它告诉 RabbitMQ 将消息保存到磁盘,但是,这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完,消息还在缓存的一个间隔点;此时并没有真正写入磁盘,持久性保证并不强

while (sc.hasNext()){

            String message = sc.next();    //发消息
            //1.对应的交换机  2.路由的KEY值(本次是队列名)   3.其他参数  4.发送消息的消息体
            channel01.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

            System.out.println("消息发送完毕!");
        }

不公平分发(在消费者处开启):

相对于轮询分发,不公平分发采用能者多劳的策略,谁干的快消息就先给谁发送,避免慢进程拖慢整个服务的进度

预取值:

不公平分发的值设置为1,若设置的数值大于1,则表示为预取值;所谓的预取值,是设置消费者缓冲信道中最大存储的数量;

比如:消费者C1设置预取值为2,消费者C2设置预取值为5,假设有8条消息进来时,C1有可能消费了3条,因为已经消费的消息不算入“预取值”内;而C2信道中存入5条消息,若这五条消息即使还未被C2消费,C1也不能将其消费,因为这5条消息已经放入C2的信道中进行等待排队了

 MQ 中:

可见,这里明确标明了对应消费者的预取值


2、RabbitMQ 消息的发布确认

在设置发布确认时,一般有三个步骤:

设置队列的持久化  --->>  设置队列中的消息进行持久化  --->>   通过MQ将消息保存在磁盘上,然后MQ跟生产者说明一声 “已经保存在磁盘上”(这里就是消息确认)

2.1 MQ的单个确认发布

定义:

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布;如果没有确认发布的消息,就会阻塞所有后续消息的发布

这里是模拟消息单个确认的代码:

waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常(也可以不指明时间范围)

    /**
     * 单个消息确认发布
     */
    public static void SingleConfirmMessage () throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        String QUEUE_NAME = UUID.randomUUID().toString();

        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);  //开启队列的持久化

        //开启消息的发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();    //开始时间

        for (int i =0;i<MESSAGE_COUNT;i++){
            String message = i + "";

            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //消息单个确认,发送成功一次就确认一次
            boolean singleRes = channel.waitForConfirms();
            if(singleRes) {
                System.out.println("消息发布成功!");
            }
        }

        long end = System.currentTimeMillis();    //结束时间

        System.out.println("总耗时为:"+(end-begin)+"ms");
    }

运行结果:

可见,总耗时时间为 1269 ms,虽然保证了消息的可靠性,但是性能下来了,需要一条条确认

2.2 MQ的批量确认发布

定义:

这是也一种同步确认发布消息的方式;先发布一批消息,然后一起确认可以极大地提高吞吐量;当然这种方式的缺点就是:由于是批量确认发布,当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息,而后重新发布消息

这里是模拟批量确认发布的代码:

    /**
     * 批量消息确认发布
     */
    public static void BatchConfirmMessage() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        String QUEUE_NAME = UUID.randomUUID().toString();

        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);  //开启队列的持久化

        //开启消息的发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();    //开始时间

        int batchSize = 100;    //批量确认的长度

        for (int i=0;i<MESSAGE_COUNT;i++){
            String message = i + "";

            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //每发送一百条数据,就进行批量发布确认
            if(i % batchSize == 0){
                boolean batchRes = channel.waitForConfirms();
                if(batchRes) {
                    System.out.println("批量发送消息成功!");
                }
            }
        }

        long end = System.currentTimeMillis();    //结束时间
        System.out.println("总耗时为:"+(end-begin)+"ms");
    }

运行结果:

可见,总耗时为 199 ms, 相比于单个确认发布,在性能方面有了很大的提升,但是容错率相对来说就升高了,因为由于批量,很难确定是哪一条消息出现了错误

2.3 MQ的异步确认发布(重点)

定义:

很显然,这是一种异步确认发布消息的方式,异步虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说;它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功

大致流程图:

异步发送消息时,不需要等待当前消息经过确认后才能将之后的消息发送出去;我们要做的只是发布消息,其余的交给 broker 中间人处理;而最终的消息是否发布成功,取决于之后的回调确认消息的函数,由于每一个发出去的消息都有 KEY VALUE ,因此,我们能很快的找到对应发送失败的消息

存在问题:

由于监听器是在发布消息完成后执行的所以采用一般的方法是检测不到发送失败的消息

解决方案:

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在确认回调与发布线程之间进行消息的传递

ConcurrentLinkedQueue是Java中的一个并发队列实现,它提供了线程安全的无界队列(unbounded queue)功能;它是基于链表实现的队列,可以在多线程环境下高效地进行元素插入和删除操作

这里是模拟异步确认发布的代码:

   /**
     * 异步确认发布消息
     */
    public static void AsynchronousConfirmMessage() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        String QUEUE_NAME = UUID.randomUUID().toString();

        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);  //开启队列的持久化

        //1.开启消息的发布确认
        channel.confirmSelect();


        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况
         * 【优点】:
         * 1.将消息与对应的序号相关联
         * 2.批量的根据序号删除条目
         * 3.支持高并发
         */
        ConcurrentSkipListMap<Long,String> concurrentSkipListMap
                = new ConcurrentSkipListMap<>();

        //2.这里准备消息监听器,以便于监听消息的成功与否 (delivery:消息的编号,multiple:用来判断是否为批量)
        ConfirmCallback ACK_callback = (deliveryTag,multiple) ->{   //消息确认成功 回调函数

            //2.2【第二步】若是批量发消息,则进行批量的删除
            //【注意这里只有已经确认的消息,不会干扰到未确认的消息】
            if(multiple) {
                ConcurrentNavigableMap<Long, String> concurrentNavigableMap
                        = concurrentSkipListMap.headMap(deliveryTag);

                concurrentNavigableMap.clear();
            }else {
                //2.3 若是单个发消息,则单个删除
                concurrentSkipListMap.remove(deliveryTag);
            }

            log.info("确认的消息编号:"+deliveryTag);
        };

        ConfirmCallback NACK_callback = (deliveryTag,multiple)->{   //消息确认失败 回调函数

            log.error("未确认的消息编号:"+deliveryTag);
        };

        channel.addConfirmListener(ACK_callback,NACK_callback); //将以上回调确认函数添加到监听器中


        long begin = System.currentTimeMillis();    //开始时间

        //【第一步】这里为模拟消息的发送
        for (int i=0;i<MESSAGE_COUNT;i++){

            String message = i +"";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //3.记录消息的总和,往里面存入信道的序号以及对应序号的信息
            //channel.getNextPublishSeqNo() 表示获取当前消息的下一个消息编号
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();    //结束时间
        System.out.println("总耗时为:"+(end-begin)+"ms");
    }

关于其中使用 concurrentSkipListMap.headMap(deliveryTag) 进行批量删除的解释说明:

批量删除的目的是在消息确认成功时,删除已经确认的消息,以便只保留未确认的消息。为了实现这一点,代码使用了 headMap(deliveryTag)方法来获取 concurrentSkipListMap 中小于 deliveryTag的所有键值对子集

这是为什么使用 headMap(deliveryTag)的原因:

  1. ConcurrentSkipListMap是一个有序映射(按照键的自然顺序排序),并且支持高效的范围查询;headMap(deliveryTag)方法返回一个新的ConcurrentNavigableMap,该映射包含了所有键小于deliveryTag的键值对子集

  2. 批量确认通常涉及到确认多个消息,这些消息的deliveryTag可能是连续的;例如,如果要确认deliveryTag为1到5的消息,那么可以使用headMap(deliveryTag)来一次性获取deliveryTag小于6的所有消息;这样,可以高效地删除多个消息而不需要单独操作每个消息

  3. 使用 concurrentSkipListMap.headMap(deliveryTag)可以确保只删除已确认的消息,而不会影响未确认的消息

批量删除的实现方法非常简单,只需调用concurrentNavigableMap.clear(),它会清除concurrentNavigableMap中的所有键值对,这些键值对对应于已确认的消息

总结:使用 headMap(deliveryTag)来获取小于 deliveryTag的消息是为了高效地批量删除已确认的消息;批量删除的方法是调用 clear()来清除这些已确认的消息,以确保只保留未确认的消息在 concurrentSkipListMap中;这有助于有效地管理消息状态并节省操作的开销

运行结果:

可见,异步确认发布消息效率比以上两种方式都要高,由于是异步发送的消息,所以顺序会很不一致


3、关于 Exchanges 交换机

四种 MQ 交换机(未按先后排序):

  • 直连交换机(Direct Exchange): 直连交换机是最简单的交换机类型,它通过将消息的路由键(Routing Key)与绑定键(Binding Key)进行精确匹配来进行消息路由;消息生产者将消息发送到具有匹配路由键的交换机上,然后交换机将消息传递给与绑定键匹配的队列

【消息生产者】

public static final String Exchange_Name = "exchange_direct"; //交换机

    public static void main(String[] args) throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明一个交换机
        channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);

        Scanner sc = new Scanner(System.in);

        while (sc.hasNext()){
            String message = sc.next();
            channel.basicPublish(Exchange_Name,"error",null,message.getBytes());
            System.out.println("消息发送成功!");
        }
    }

【消息消费者一】

public static final String Exchange_Name = "exchange_direct"; //交换机
    public static final String QUEUE_NAME = "disk";  //队列名称

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明一个交换机
        channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);

        //2.声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //3.将交换机与队列进行绑定
        channel.queueBind(QUEUE_NAME,Exchange_Name,"error");
        System.out.println("【消费者二】等待接收信息...");

        //4.对应的回调函数
        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("【消费者二】接收到信息:"+new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = (consumerTag)->{

            System.out.println(consumerTag+"【消费者二】取消了消息的发送");
        };

        //5.使用自动应答
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

【消息消费者二】

这里是消费者二设置的 Routing Key

//3.将交换机与队列进行绑定
 channel.queueBind(QUEUE_NAME,Exchange_Name,"info");
 channel.queueBind(QUEUE_NAME,Exchange_Name,"warning");

 

 运行结果:

可见,消息生产者将消息发送到对应的 Routing Key 处,即消费者二,而消费者一没有接收到消息


  • 主题交换机(Topic Exchange): 主题交换机允许使用 “通配符匹配的方式” 进行消息路由,其中路由键可以使用 "*" 代表单个单词,"#" 代表多个单词;它能够根据消息的路由键模式与队列的绑定键模式进行匹配,以实现更灵活的消息路由

需要注意的是:

  1. 当一个队列绑定键是 “ # ” ,那么这个队列将接收所有数据,类似 FANOUT交换机
  2. 如果队列绑定键中没有 “*” 和 “#” 出现,则类似 DIRECT交换机

由此可发现,TOPIC交换机包括了扇出和直接交换机,功能更加强大😎

【消息生产者】

这里使用 Map 集合将绑定键以及对应的消息进行封装,然后再依次遍历取出

public static final String Exchange_Name = "exchange_topic"; //交换机

public static void main (String[] args) throws Exception {

    try (Channel channel = RabbitUtils.getChannel()) {

        //1.声明一个交换机
        channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);

        /**
         * Q1-->绑定的是
         * 中间带 orange 带 3 个单词的字符串(*.orange.*)
         * Q2-->绑定的是
         * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         * 第一个单词是 lazy 的多个单词(lazy.#)
         *
         */
        Map<String, String> bindingKeyMap = new HashMap<>();

        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        //2.map 中进行 key-value 值的遍历操作,依次放入信道中进行消息的发送
        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {

            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(Exchange_Name, bindingKey, null,
                    message.getBytes("UTF-8"));
            System.out.println("生产者发出消息" + message);
        }
    }
}

【消息消费者一】

public static final String Exchange_Name = "exchange_topic"; //交换机

    public static final String QUEUE_NAME = "Q1";  //队列名称

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明一个交换机
        channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);

        //2.声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //3.将交换机与队列进行绑定
        channel.queueBind(QUEUE_NAME,Exchange_Name,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME,Exchange_Name,"lazy.#");
        System.out.println("【消费者一】等待接收信息...");

        //4.对应的回调函数
        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("【消费者一】接收到信息:"+new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = (consumerTag)->{

            System.out.println(consumerTag+"【消费者一】取消了消息的发送");
        };

        //5.使用自动应答
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

【消息消费者二】

只有绑定键与消费者一不同

//3.将交换机与队列进行绑定
 channel.queueBind(QUEUE_NAME,Exchange_Name,"*.orange.*");

运行结果:

消费者一:消费者二:


  • 扇形交换机(Fanout Exchange): 扇形交换机将消息广播到所有绑定到该交换机上的队列中,无需考虑路由键的匹配;无论绑定键的数量和绑定队列的数量如何,扇形交换机都会将消息复制并发送到所有队列中

【消息生产者】

这里只展示需要另外添加的部分语句       

//1.声明一个交换机
        channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);

【消息消费者】

这里只展示需要另外添加的部分语句 

   //1.声明一个交换机
     channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);

   //2.声明队列
     channel.queueDeclare(QUEUE_NAME,false,false,false,null);

   //3.将交换机与队列进行绑定
     channel.queueBind(QUEUE_NAME,Exchange_Name,"");


  • 标题交换机(Headers Exchange): 标题交换机使用消息的标头(Headers)进行匹配,并根据标头的匹配结果将消息路由到相关的队列;它不依赖于路由键或绑定键,而是使用消息的标头信息进行匹配过滤


4、死信队列(重点)

定义:死信,顾名思义,死掉的信息,即无法被消费的消息;由于存在有该类型的消息,所以对应保存该类型的队列随即产生,即死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中例:用户在商城下单成功并点击去支付后,在指定时间未支付,则自动失效

死信队列大致流程图:

  • 正常情况下,生产者(producer)将消息通过普通交换机(normal_exchange)所绑定的普通队列(normal_queue)发送到消费者 C1
  • 而异常情况下,将异常的消息保存到死信队列(dead_queue),并发送到消费者 C2

【消费者C1】

C1需要不仅需要处理正常消息的发送,还需要处理失效消息往死信队列中的传递,以及普通队列与死信队列之间的绑定关系

/**
 * 模拟死信队列
 *
 * 消费者 C1
 */
public class Receive01 {

    public static final String Normal_Exchange = "normal_exchange"; //普通交换机

    public static final String Dead_Exchange = "dead_exchange"; //死信交换机

    public static final String Normal_Queue = "normal_queue"; //普通队列

    public static final String Dead_Queue = "dead_queue"; //死信队列

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明死信以及普通队列的交换机
        channel.exchangeDeclare(Normal_Exchange,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(Dead_Exchange,BuiltinExchangeType.DIRECT);

        //2.声明死信队列以及普通队列
        HashMap<String, Object> deadLetters = new HashMap<>();
        //2.1 普通队列设置死信交换机(注意:这里的 KEY 是固定的)
        deadLetters.put("x-dead-letter-exchange",Dead_Exchange);
        //2.2 设置死信的 Routing Key
        deadLetters.put("x-dead-letter-routing-key","list");
        //2.3 设置过期时间(这里不进行设置)
//        deadLetters.put("x-message-ttl",10000);

        channel.queueDeclare(Normal_Queue,false,false,false,deadLetters);
        channel.queueDeclare(Dead_Queue,false,false,false,null);

        //3.将队列与交换机进行绑定
        channel.exchangeBind(Normal_Queue,Normal_Exchange,"zhangsan");
        channel.exchangeBind(Dead_Queue,Dead_Exchange,"lisi");

        System.out.println("等待接收消息中.....");

        //4.接收到消息的回调
        DeliverCallback deliverCallback = (consumerTag,message) ->{

            System.out.println("消费者一接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };

        //5.消息中断的回调
        CancelCallback  cancelCallback = (consumerTag) -> {

            System.out.println(consumerTag + "消费者取消了消息的接收!");
        };

        channel.basicConsume(Normal_Queue,true,deliverCallback,cancelCallback);
    }
}

【生产者Producer】

这里模拟的是消息被拒

生产者不需要管理队列的消息是否发送成功,只需要将消息发送到普通队列中

public static final String Normal_Exchange = "normal_exchange"; //普通交换机

    public static final int MESSAGE_COUNT = 10; //消息的总数

    public static final String TTL_TIME = "10000"; //过期时间

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明一个交换机
        channel.exchangeDeclare(Normal_Exchange, BuiltinExchangeType.DIRECT);

        //2.死信消息,设置 TTL 消息过期时间,过期则传送到死信队列中
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                        .builder().expiration(TTL_TIME).build(); //10s

        //3.这里进行依次发送消息,同时设置了消息过期时间
        for (int i=0;i<=MESSAGE_COUNT;i++){
            String message = "info"+i;

            channel.basicPublish(Normal_Exchange,"zhangsan",properties,message.getBytes());

            System.out.println("消息生产者发送消息:"+message);
        }
    }

【消费者C2】

消费者C2的任务是只需要消费死信队列中的消息

public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        System.out.println("消费者二等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("消费者二接收到消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = (consumerTag)->{

            System.out.println(consumerTag+"消费者中断了消息..");
        };

        //2.消费死信队列中的消息
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }

运行结果:

MQ 中:

这里,进行中断普通队列的接收,模拟死信队列场景

当死信队列中存在未被消费的消息时,C2感应到存在的消息,并将之前发送失败的消息进行消费


5、延迟队列(整合SpringBoot)

定义:

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理;简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

延迟队列一般使用的场景:

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

代码架构图:

创建两个队列 QAQB,两者队列 TTL 分别设置为 10S40S,然后在创建一个交换机 X 和死信 交换机 Y,它们的类型都是 Direct,创建一个 死信队列 QD,它们的绑定关系如下:

当然,这里进行整合 SpringBoot 进行使用 🔜

【这里是 application.yaml 文件中的配置】

spring:
  #这里是 RabbitMQ 的配置
  rabbitmq:
    port: 5672  #指定的 rabbitMQ 服务器端口号
    username: admin
    password: 123
    host: 192.168.101.65

【这里是延迟队列队列以及交换机的配置类】

这里是图中的中间那一段,也就是队列以及交换机的声明以及绑定

/**
 * 延迟队列中【队列】与【交换机】的 "声明" 以及 "绑定" 的配置类
 */
@Configuration
public class TTLQueueConfig {

    //普通交换机
    public static final String Normal_Exchange = "X";
    //死信交换机
    public static final String Dead_Exchange = "Y";
    //普通队列
    public static final String Normal_QueueA = "QA";
    public static final String Normal_QueueB = "QB";
    //死信队列
    public static final String Dead_Queue = "QD";


    //1.声明交换机
    //1.1声明普通交换机
    @Bean("Normal_Exchange")
    public DirectExchange Normal_Exchange(){

        return new DirectExchange(Normal_Exchange);
    }
    //1.2 声明死信交换机
    @Bean("Dead_Exchange")
    public DirectExchange Dead_Exchange(){

        return new DirectExchange(Dead_Exchange);
    }

    //2.声明普通队列,与死信交换机进行绑定,并声明过期时间
    @Bean("Normal_QueueA")  //【队列A】
    public Queue QA(){

        HashMap<String, Object> arguments = new HashMap<>(2); //这里初始化 map 的长度,加快编译速度
        //2.1 设置死信交换机
        arguments.put("x-dead-letter-exchange",Dead_Exchange);
        //2.2 设置死信 Routing Key
        arguments.put("x-dead-letter-routing-key","YD");
        //2.3 设置 TTL 过期时间
//        arguments.put("x-message-ttl",10000);

        return QueueBuilder
                .durable(Normal_QueueA) //开启队列持久化
                .withArguments(arguments).ttl(10000).build();
    }

    @Bean("Normal_QueueB")  //【队列B】
    public Queue QB(){

        HashMap<String, Object> arguments = new HashMap<>(2); //这里初始化 map 的长度,加快编译速度
        arguments.put("x-dead-letter-exchange",Dead_Exchange);
        arguments.put("x-dead-letter-routing-key","YD");
//        arguments.put("x-message-ttl",30000);

        return QueueBuilder
                .durable(Normal_QueueB)
                .withArguments(arguments).ttl(30000).build();
    }

    //3.声明死信队列
    @Bean("Dead_Queue")
    public Queue QD(){

        return QueueBuilder
                .durable(Dead_Queue).build();
    }


    //4.将普通交换机与队列A进行绑定
    @Bean
    public Binding QA_Binding_NormalQueue(@Qualifier("Normal_QueueA") Queue queueA,
                                          @Qualifier("Normal_Exchange") DirectExchange normalExchange){

        return BindingBuilder.bind(queueA)
                .to(normalExchange).with("XA");
    }
    //4.1将普通交换机与队列B进行绑定
    @Bean
    public Binding QB_Binding_NormalQueue(@Qualifier("Normal_QueueB") Queue queueB,
                                          @Qualifier("Normal_Exchange") DirectExchange normalExchange){

        return BindingBuilder.bind(queueB)
                .to(normalExchange).with("XB");
    }
    //4.2将死信交换机与死信队列进行绑定
    @Bean
    public Binding QD_Binding_DeadExchange(@Qualifier("Dead_Queue")Queue deadQueue,
                                           @Qualifier("Dead_Exchange")DirectExchange deadExchange){

        return BindingBuilder.bind(deadQueue)
                .to(deadExchange).with("YD");
    }

}

【消息生产者】

这里打算发送一个消息请求,分别给不同的 TTL 队列

/**
 * 消息生产者
 *
 * 这里进行发送 http://localhost:8080/ttl/sendMsg/小白
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Resource
    RabbitTemplate rabbitTemplate;

    //发送消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable("message")String message){

        log.info("当前时间:{},发送了一条信息:{}给两个 TTL 队列",new Date(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自30s的队列:"+message);
    }
}

【消息消费者】

使用 Listener 监听器进行死信队列的监听

/**
 * 这里是消息的消费者
 */
@Slf4j
@Component
public class DeadLetterConsumer {


    //死信队列中进行接收 TTL 延迟消息
    @RabbitListener(queues="QD")    //调用监听器监听死信队列
    public void DeadQueue_consumer(Message message, Channel channel){

        String msg = new String(message.getBody());
        log.info("当前时间:{}"+"消费者死信队列接收到消费的消息:{}",new Date().toString(),msg);
    }
}

运行结果:

发现问题🤔:

若以后需要多个不同的 TTL 消息,那么就需要建立多个消息队列,以达到传递不同 TTL 的消息;这样导致耦合度升高,不符合开闭原则,所以接下来进行延迟队列的优化

解决问题:

新增一个 QC 队列,这个队列不设置延迟时间,而是让 Producer消息生产者 发送消息的时候进行设置消息的 TTL 时间,这样,就不用频繁改动队列的 TTL 时间

做绑定的代码跟上面一样,不做展示,这里是生产者的代码,进行设置发送消息的 TTL时间:

 @GetMapping("/sendMsg/ttl/{message}/{ttlTime}")
    public void QC_sendMsgByTTL(@PathVariable("message") String message,@PathVariable("ttlTime") String ttlTime){

        log.info("当前时间:{},发送了一条消息给QC:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","QC",message,msg->{

            //这里进行设置消息的 TTL 时间
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

运行结果(这里使用的是低版本的 RabbitMQ)

由结果可知,由于队列的先进先出特性,先发的 TTL消息若时间设置大于后发的 TTL消息,那么,后发的消息就会被堵塞,直到先发的 TTL消息发送完毕,后发的 TTL 消息才能继续发送,这是一个弊端

这里需要注意的是:版本高一些的 RabbitMQ 已经修复了以上问题


6、备份交换机(重点)

定义:

在消息无法被路由到任何队列时,将这些消息发送到备份交换机指定的交换机中,而不是直接丢弃这些消息;这增加了消息传递的可靠性,可以用于处理路由失败或未匹配的消息,确保消息不会因无法路由而丢失,并允许进行相应的处理或日志记录;备份交换机通常用于处理消息路由失败的情况,以提供额外的消息处理机制

大致流程图如下:

【application.yaml配置类】

这里需要手动的开启消息回调与失败消息的回退

spring:
  #这里是 RabbitMQ 的配置
  rabbitmq:
    port: 5672  #指定的 rabbitMQ 服务器端口号
    username: admin
    password: 123
    host: 192.168.101.65

    #这里进行开启发布确认以及消息的回调
    publisher-confirm-type: correlated

    #将发送失败的消息回退给生产者
    publisher-returns: true

【消息回调接口】

  • 由于 MyCallBack 方法重写了 RabbitTemplate 接口中的 ConfirmCallback 以及 ReturnCallback 方法;需要将 RabbitTemplate 对象进行注入,才能进行调用自己重写的RabbitTemplate 内部方法
  • 其中,@PostConstruct   是一个在Spring Bean初始化完成后执行的注解init()方法 被标记为 @PostConstruct,因此它将在该Bean的初始化阶段被调用

需要注意的是:RabbitTemplate 注入必须在 init 前,不然会报未注入异常

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Resource
    RabbitTemplate rabbitTemplate;


    /**
     * 由于这里重写继承接口中的方法,所以需要进行注入操作
     */
    @PostConstruct
    public void init(){

        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }


    /**
     * 交换机回调信息的接口
     * @param correlationData 保存回调消息的 id 以及相关的信息
     * @param Ack_Message 消息确认
     * @param reason 消息发送失败,回调的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean Ack_Message, String reason) {

        String messageId = "";

        if (ObjectUtil.isNotNull(correlationData)) { //判断是否为空,防止空指针异常

            messageId = correlationData.getId();
        }

        if (Ack_Message) {
            log.info("成功接收到消息,消息ID为:{}", messageId);
        } else {
            log.info("接收消息失败,消息ID为:{},失败的原因为:{}", messageId, reason);
        }
    }


    /**
     * 将发送失败的消息进行回退
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {

        log.error("消息“:{} 被交换机:{} 退回,退回的原因:{},消息的 RoutingKey:{}",
                new String(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1048006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Timer/计时器发布者与订阅者, Subscribar/文本框订阅模式 的使用

1. Timer 计时器的操作 1.1 实现 /// 计时器 发布者与订阅者 struct TimerBootcamp: View {// 计时器 发布者 timer1/timer3/timer5 1.0 , timer2 2.0 , timer4 0.5let timer Timer.publish(every: 1, on: .main, in: .common).autoconnect()// ---1---// 计算性属性 当前…

线性代数(七) 矩阵分析

前言 从性线变换我们得出&#xff0c;矩阵和函数是密不可分的。如何用函数的思维来分析矩阵。 矩阵的序列 通过这个定义我们就定义了矩阵序列的收敛性。 研究矩阵序列收敛性的常用方法&#xff0c;是用《常见向量范数和矩阵范数》来研究矩阵序列的极限。 长度是范数的一个特…

C语言实现---通讯录

通讯录 前言1.初始化通讯录2.SHOW展示通讯录3.ADD添加联系人的函数4.DEL删除联系人的函数5.SEARCH查找联系人的函数6.MODIFY修改联系人的函数7.销毁通讯录8.整体代码展示1.头文件"contact.h"2.源文件"contact.c"3.测试文件"test.c" 前言 那么好…

解决键盘长按才有反应

问题分析&#xff1a; 排除键盘自身没有问题&#xff0c;那就应该是电脑设置的问题&#xff0c;目前我遇到的不小心长按shift键设置了【筛选键】 解决方式&#xff1a; 打开设置搜索筛选建&#xff0c;关闭&#xff0c;粘带键也会出现类似问题。

【go语言】结构体

结构体 结构体定义 type name struct{value1 type1value2 type2...... }组成结构体的数据称为字段&#xff0c;每一个字段有一个类型和一个名字&#xff0c;每一个字段的名字必须唯一&#xff0c;字段的类型任意。 创建结构体 type myStruct struct{i1 intf1 float32str st…

【软件测试】开发/测试模型

开发/测试模型 瀑布模型 设计&#xff1a;技术文档(设计那些接口&#xff0c;库表&#xff0c;mq&#xff0c;定时任务)&#xff0c;UI视觉稿 特点&#xff1a;线性的结构。 优点&#xff1a;每个阶段做什么&#xff0c;产出什么非常清晰 缺点&#xff1a;测试人员介入太晚…

了解CISP,看这篇文章就够了,附上CISP证书题库资料

前言 【点击此处领取CISP题库资料和网络安全学习资】 什么是CISP&#xff1f; CISP是中国信息安全测评中心依据中编办赋予的职能&#xff0c;建立和发展的一整套完整的信息安全保障人才培训体系&#xff0c;从2002年开始启动。 CISP (CertifiedInformation Security Profes…

分享55个C源码源代码总有一个是你想要的

分享55个C源码源代码总有一个是你想要的 链接&#xff1a;https://pan.baidu.com/s/1_zbaunqvmYRhCiX7hbiqmg?pwd8888 提取码&#xff1a;8888 1. 项目名称 apachesubversion版本控制系统 v1.10.3 Ceph分布式文件系统 v17.2.5 clip命令行插图处理器 v0.8 curve分布式存…

在前端设计中,子元素的基线和父元素的基线分别是什么意思?并利用Bootstrap的类align-items-baseline实现子元素在其父容器内基线对齐。

子元素的基线和父元素的基线是用于文本对齐的重要概念。让我解释一下它们分别指的是什么&#xff1a; 子元素的基线&#xff08;Baseline of Child Elements&#xff09;&#xff1a; 子元素的基线是指子元素内文本的底部边缘&#xff0c;特别是字母的底部边缘。在包含文本的元…

医药行业电力供应3大难题?教你如何破解

电力是现代社会不可或缺的基础设施之一&#xff0c;它支持着工业、商业和生活的各个方面。在这个数字化、电气化的时代&#xff0c;电力配电系统扮演着关键的角色&#xff0c;确保电能以可靠、高效、安全的方式分发到我们的家庭、企业和工厂。 然而&#xff0c;要保持电力分配的…

LeetCode 518.零钱兑换II 动态规划 + 完全背包 + 组合数

给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带符号整数。 示例…

DevOps持续集成与交付

概述 Jenkins是一个支持容器化部署的、使用Java运行环境的开源软件&#xff0c;使用Jenkins平台可以定制化不同的流程与任务、以自动化的机制支持DevOps领域中的CI与CD&#xff0c;在软件开发与运维的流程中自动化地执行软件工程项目的编译、构建、打包、测试、发布以及部署&a…

FileManager/本地文件增删改查, Cache/图像缓存处理 的操作

1. FileManager 本地文件管理器&#xff0c;增删改查文件 1.1 实现 // 本地文件管理器 class LocalFileManager{// 单例模式static let instance LocalFileManager()let folderName "MyApp_Images"init() {createFolderIfNeeded()}// 创建特定应用的文件夹func cr…

在英文电脑系统中,中文显示??????

如果操作系统是英文的&#xff0c;那么无论是在cmd界面&#xff0c;还是在Visual Studio的调试界面&#xff0c;中文显示都是一串问号??????? 这是因为在英文系统中&#xff0c;Console 的默认代码页是 437(OEM -United States)&#xff0c;不支持中文输入输出&#xff…

照片后期处理软件DxO FilmPack 6 mac中文说明

DxO FilmPack 6 for Mac是一款照片后期处理软件。它可以模拟超过60种著名胶片品牌和类型的色彩和颗粒感&#xff0c;使照片具有复古、艺术和时尚风格。 ​DxO FilmPack 6 mac支持RAW和JPG格式的照片&#xff0c;并提供丰富的调整选项&#xff0c;如亮度、对比度、曝光、阴影和高…

十六.镜头知识之工业镜头的质量判断因素

十六.镜头知识之工业镜头的质量判断因素 文章目录 十六.镜头知识之工业镜头的质量判断因素1.分辨率(Resolution)2.明锐度(Acutance)3.景深(DOF)&#xff1a;4. 最大相对孔径与光圈系数5.工业镜头各参数间的相互影响关系5.1.焦距大小的影响情况5.2.光圈大小的影响情况5.3.像场中…

【Java 进阶篇】深入理解SQL查询语言(DQL)

SQL&#xff08;Structured Query Language&#xff09;是一种用于管理关系型数据库的强大编程语言。它提供了各种命令和语句&#xff0c;用于执行各种操作&#xff0c;包括数据查询、插入、更新和删除。本文将深入探讨SQL查询语言&#xff08;DQL&#xff09;&#xff0c;它是…

Bootstrap的弹性盒子布局学习笔记

Bootstrap的弹性盒子布局学习笔记 目录 01-综述02-利用类d-flex与类d-inline-flex将容器定义为弹性盒子03-对弹性容器的的元素在水平方向上进行排列顺序设置03-对弹性容器的的元素在垂直方向上进行排列顺序设置04-弹性盒子内所有元素在主轴方向上的对齐方式05-1-弹性盒子内各行…

myArm 全新七轴桌面型机械臂

引言 在不断演进的科技世界中&#xff0c;我们始终追求创新和卓越&#xff0c;以满足客户的需求并超越他们的期望。今天&#xff0c;我们很高兴地宣布我们的最新产品——myArm 300 Pi&#xff0c;一款七轴的桌面型机械臂。这款产品的独特之处在于其灵活性和可编程性&#xff0c…

16. Seata 分布式事务

Spring Cloud 微服务系列文章&#xff0c;点击上方合集↑ 1. 简介 Seata 是一款开源的分布式事务解决方案&#xff0c;致力于提供高性能和简单易用的分布式事务服务。 事务是保障一系列操作要么都成功&#xff0c;要么都失败。就比如转账&#xff1a;A转账100元给B&#xff0…