消息队列(一)-- RabbitMQ入门(2)

news2024/12/26 16:09:12

发布确认

  • 发布确认原理
    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。此外 broker 也可以设置 basic.ack的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
    confirm 模式最大的好处在于它是异步的。一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
  • 开启发布确认的方法
    发布确认默认是没有开启的,开启需要调信道 confirmSelect 方法。
    在这里插入图片描述
  • 单个发布确认
    同步确认发布的方式,也就是发布一个消息之后被确认发布后,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,若在指定的时间范围内没有确认则抛出异常。
    缺点:发布速度特别慢,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
    //单个发布确认
    public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("", queueName, null, message.getBytes());
            //单个消息就马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息发送成功");
            }

        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("单独确认耗时:"+(end-begin)+"ms");
    }

在这里插入图片描述

  • 批量发布确认
    先发布一批消息后,一起确认可以极大提高吞吐量。缺点:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案也是同步的,也一样阻塞消息的发布。
    //批量发布确认
    public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量确认消息大小
        int batchSize = 100;
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("", queueName, null, message.getBytes());
            //判断达到100条信息的时候批量确认一次
            if((i+1)%batchSize==0){
                channel.waitForConfirms();
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("批量确认耗时:"+(end-begin)+"ms");
    }

在这里插入图片描述

  • 异步发布确认
    异步发布确认虽然编程逻辑比上面连个要复杂,但是性价比最高,无论是可靠性还是效率都没的说,它是利用回调函数来达到消息可靠性传递的。
    在这里插入图片描述
    //异步发布确认
    public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //消息确认成功回调
        /**
         * 1.消息的标记
         * 2.是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple)->{
            System.out.println("确认的消息:"+deliveryTag);
        };
        //消息确认失败回调
        ConfirmCallback nackCallback = (deliveryTag, multiple)->{
            System.out.println("未确认的消息:"+deliveryTag);
        };
        /**
         * 准备消息的监听器 监听哪些消息成功了,哪些失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("异步确认耗时:"+(end-begin)+"ms");
    }

在这里插入图片描述

  • 如何处理异步未确认消息
    最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
    //异步发布确认
    public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目
         * 3.支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        //消息确认成功回调
        /**
         * 1.消息的标记
         * 2.是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple)->{
            //2.删除已经确认的消息,剩下的就是未确认的消息
            if(multiple){
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else{
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };
        //消息确认失败回调
        ConfirmCallback nackCallback = (deliveryTag, multiple)->{
            //3.打印一下未确认的消息都有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息:"+deliveryTag+":"+message);
        };
        /**
         * 准备消息的监听器 监听哪些消息成功了,哪些失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("", queueName, null, message.getBytes());
            //1.此处记录下所有要发送的消息 消息的总和
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("异步确认耗时:"+(end-begin)+"ms");
    }

交换机

上述工作队列中,每个任务交给一个消费者。如果将消息传达给多个消费者,这种模式叫“发布/订阅”。
在这里插入图片描述

Exchanges

RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。相反,生产者只能将消息发送到交换机(Exchange),交换机工作的内容就是接受来着生产者的消息,并把消息推入队列。交换机怎么处理消息,由交换机类型决定。

  • Exchange 类型
    直接(direct):也叫路由类型。
    主题(topic)
    标题(headers):也叫头类型,已经不常用了。
    扇出(fanout):就是发布/订阅模式。

  • 无名 Exchange
    默认交换机,通过空字符串(“”)进行标识。消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定key指定的,无名交换机时routingKey默认是队列名称。
    在这里插入图片描述

临时队列

未持久化的随机名称的队列,一旦断开了消费者的连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue();
在这里插入图片描述

绑定(binding)

binding 就是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。下图中 X 与 Q1 和 Q2 进行了绑定。
在这里插入图片描述

Fanout 扇出(发布/订阅模式)

将接收到的所有消息广播到它知道的所有队列中。RoutingKey相同。系统中有个默认有 Fanout 类型 Exchange。
在这里插入图片描述

  • Fanout 实战
    在这里插入图片描述
    ReceiveLogs01、ReceiveLogs02 将接收消息打印在控制台
package com.ql.rabbitmq.five;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消息接收
 */
public class ReceiveLogs01 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //声明一个队列 临时队列
        /**
         * 生成一个临时队列、队列的名称是随机的
         * 当消费者断开连接后队列就自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName, EXCHANGE_NAME,"");
        System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("ReceiveLogs01接收到的消息:"+new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
    }
}

EmitLog 发送消息给两个消费者

package com.ql.rabbitmq.five;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 发消息 交换机
 */
public class EmitLog {
    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+ message);
        }
    }
}

运行测试
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Direct exchage 直接交换机(路由模式)

队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示,也可称该参数为 binding key。
创建绑定:channel.queueBind(queueName, EXCHANGE_NAME, routingKey);绑定之后的意义由其交换机类型决定。
Direct exchage 工作方式:消息只去到它绑定的 routingKey 队列中去。
在这里插入图片描述
图中,生产者发布消息到 Exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black、green 的消息会被发布到队列 Q2。

  • 多重绑定
    在这里插入图片描述Exchage 的绑定的类型是 direct ,但是它绑定的多个队列的 key 如果相同,表现和 fanout 有点类似。
  • 实战List item
package com.ql.rabbitmq.six;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsDirect01 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("console", false, false, false, null);
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");
        System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("ReceiveLogsDirect01接收到的消息:"+new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume("console", true, deliverCallback, consumerTag->{});
    }
}

package com.ql.rabbitmq.six;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsDirect02 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare("disk", false, false, false, null);
        channel.queueBind("disk", EXCHANGE_NAME, "error");
        System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("ReceiveLogsDirect02接收到的消息:"+new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume("disk", true, deliverCallback, consumerTag->{});
    }
}

package com.ql.rabbitmq.six;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 发消息 交换机
 */
public class DirectLogs {
    //交换机的名称
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+ message);
        }
    }
}

运行测试
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Topics

  • 概念
    发送到类型为 Topic 交换机的消息的 RoutingKey 不能随意写,必须是一个单词列表,以点号隔开。如“stock.used.jhhd”。单词列表最多不能超过255个字节。
    *(星号)可以代替一个单词。
    #(井号)可以替代零个或多个单词。
    在这里插入图片描述
    上图,如 lazy.orange.jkljkl 被队列Q1和Q2收到。
    当一个队列绑定建是 #,那么这个队列将接收所有数据,就有点像 fanout 了
    如果队列绑定键当中没有 # 和 * 出现,那么该队列绑定类型就是 direct 了。
  • 实战
    消费者
package com.ql.rabbitmq.seven;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 声明主题交换机 及相关队列
 * 消费者C1
 */
public class ReceiveLogsTopic01 {
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:"+queueName+"  绑定键:"+message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
    }
}
package com.ql.rabbitmq.seven;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 声明主题交换机 及相关队列
 * 消费者C2
 */
public class ReceiveLogsTopic02 {
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:"+queueName+"  绑定键:"+message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
    }
}

生产者

package com.ql.rabbitmq.seven;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class EmitLogTopic {
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        Map<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("lazy.orange.jkljkl", "被队列Q1和Q2收到");
        bindingKeyMap.put("asdf.orange.jkljkl", "被队列Q1收到");
        bindingKeyMap.put("lazy.asdf.jkljkl", "被队列Q2收到");
        bindingKeyMap.put("lazy.ordddd.rabbit", "被队列Q2收到一次");
        bindingKeyMap.put("lazy.sadf.jkljkl.adsf", "被队列Q2收到");
        bindingKeyMap.put("sdaf.asdf.jkljkl.adsf", "被丢弃");
        for (Map.Entry<String,String> bindingKeyEntry:
            bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

运行测试
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

死信队列

死信,顾名思义就是无法被消费的消息。
producer 将消息投递到 broker 或者直接到 queue 里了,有些时候由于特定原因导致 queue 中的某些消息无法被消费,这些消息若没有后续的处理,就变成了死信,有了死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。

死信的来源

  • 消息 TTL(存活时间)过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false。

死信实战

在这里插入图片描述

  • 消息TTL过期

消费者1

package com.ql.rabbitmq.eight;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 死信队列 实战
 * 消费者1
 */
public class Consumer01 {
    //普通交换机名
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明普通和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列
        Map<String, Object> arguments = new HashMap<>();
        //过期时间 10s=10000ms
        //arguments.put("x-message-ttl", 10*1000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "lisi");

        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //绑定普通交换机和普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信交换机和死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("Consumer01接收的消息:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag->{});
    }
}

生产者

package com.ql.rabbitmq.eight;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 死信队列之 生产者
 */
public class Producer {
    //普通交换机名
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息 设置TTL时间 单位是ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message = "info"+(i+1);
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
        }
    }
}

消费者2

package com.ql.rabbitmq.eight;

import com.ql.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 死信队列 实战
 * 消费者2
 */
public class Consumer02 {

    //死信队列名
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("Consumer02接收的消息:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag->{});
    }
}

先运行消费者1,把交换机和队列声明好,然后停止消费者1。
在这里插入图片描述
然后运行生产者发消息。
在这里插入图片描述
10s过后消息以为TTL过期到死信队列。
在这里插入图片描述
然后运行消费者2接收消息。
在这里插入图片描述
在这里插入图片描述

  • 队列达到最大长度
    把生产者的TTL参数注释掉
    在这里插入图片描述
    然后把消费者1 里设置正常队列限制的长度

在这里插入图片描述
因为对正常队列的参数重新设置,需要先删除之前声明的正常队列
在这里插入图片描述
运行消费者1,然后再停止。可以看到重新声明的队列有三个参数
在这里插入图片描述
然后运行生产者,发了10个消息,4个被移到死信队列
在这里插入图片描述
再运行消费者2 把4个消息消费掉
在这里插入图片描述
在这里插入图片描述

  • 消息被拒绝
    修改消费者1
    在这里插入图片描述
    然后删掉正常队列后启动消费者1,和生产者
    在这里插入图片描述
    在这里插入图片描述
    然后启动消费者2消费info5消息
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/787946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蓝桥杯省赛真题——最少刷题数

2022年第13届省赛&#xff0c;蓝桥杯真题。 (本笔记适合初通 Python 的 coder 翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是基础那么简单…… 地址&…

【前端知识】React 基础巩固(三十三)——Redux的使用详解

React 基础巩固(三十三)——Redux的使用详解 Redux的使用详解 针对React 基础巩固&#xff08;三十二&#xff09;中的案例&#xff0c;我们希望抽取页面中共有的代码&#xff08;例如下方的代码&#xff09;&#xff0c;使用高阶组件统一拦截。 constructor() {super();this.…

Python入门【 for循环和可迭代对象遍历、嵌套循环和综合练习、continue语句、else语句、循环代码优化】(八)

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小王&#xff0c;CSDN博客博主,Python小白 &#x1f4d5;系列专栏&#xff1a;python入门到实战、Python爬虫开发、Python办公自动化、Python数据分析、Python前后端开发 &#x1f4e7;如果文章知识点有错误…

两个数组的dp问题(2)--动态规划

一)交错字符串: 97. 交错字符串 - 力扣&#xff08;LeetCode&#xff09; 一)确定一个状态标识: 如果我选择s1的一段区间&#xff0c;再进行选择s2得一段区间那么s3这个字符串的长度就已经固定了 预处理:在s1字符串s2字符串和s3字符串前面加上一个虚拟字符&#xff0c;让下标从…

力扣热门100题之最小覆盖子串【困难】【滑动窗口】

题目描述 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 “” 。 注意&#xff1a; 对于 t 中重复字符&#xff0c;我们寻找的子字符串中该字符数量必须不少于 t 中该字符数量。…

java项目之人才公寓管理系统(ssm+mysql+jsp)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的人才公寓管理系统。技术交流和部署相关看文章末尾&#xff01; 开发环境&#xff1a; 后端&#xff1a; 开发语言&#xff1a;Java 框架&…

JVM-提问纯享版

一、内存区域 介绍下 Java 内存区域&#xff08;运行时数据区&#xff09;内存分配方式内存分配并发问题对象的访问定位的两种方式&#xff08;句柄和直接指针两种方式&#xff09; 二、垃圾回收 如何判断对象是否死亡&#xff08;两种方法&#xff09;。简单的介绍一下强引…

Web3教程| 什么是地址监控?如何使用地址监控追踪黑客地址?

在当今Web3世界里&#xff0c;保护个人资产安全至关重要。据报道在2023年上半年&#xff0c;Web3领域因黑客攻击事件造成的损失高达4.794亿美元。 此外&#xff0c;10多个公链遭受黑客攻击&#xff0c;其中以太坊链遭受的损失最多&#xff0c;约为2.87亿美元。这些黑客的存在迫…

maven配置下载源

有得时候项目中会配置默认的谷歌作为源下载依赖这样会导致下载很慢&#xff0c;我们可以将谷歌的源更改为国内的阿里源&#xff0c;这样下载就会比较快 查看pom.xml文件 若是在配置时没有指定该依赖的下载源的话&#xff0c;就是默认去谷歌源下载&#xff0c;这时我们在项目po…

KEGG 通路如何找

链接&#xff1a; KEGG: Kyoto Encyclopedia of Genes and Genomes 学习链接&#xff1a; 科研干货&#xff5c;KEGG信号通路数据库轻松上手_哔哩哔哩_bilibili 示例&#xff1a;我要找人的结直肠癌信号通路&#xff1a; 1. 2. 3. 4. over

pytest自动化测试框架,真正做到从0到1由浅入深详细讲解【万字级】

目录 嗨咯铁汁们&#xff0c;很久不见&#xff0c;我还是你们的老朋友凡叔&#xff0c;这里也感谢各位小伙伴的点赞和关注&#xff0c;你们的三连是我最大的动力哈&#xff0c;我也不会辜负各位的期盼&#xff0c;这里呢给大家出了一个pytest自动化测试框架由浅入深详细讲解。 …

mysql(二)SQL语句

目录 一、SQL语句类型 二、数据库操作 三、数据类型 四、创建 五、查看 六、更改 七、增、删、改、查 八、查询数据 一、SQL语句类型 SQL语句类型&#xff1a; DDL DDL&#xff08;Data Definition Language&#xff0c;数据定义语言&#xff09;&#xff1a;用于…

项目经理:我不是不喜欢工作,只是不喜欢开会

大家好&#xff0c;我是老原。 如何高效的开会&#xff0c;我觉得我可太有发言权了&#xff01;作为项目经理&#xff0c;每天就是开会&#xff0c;开会还好&#xff0c;还经常是无效会议。 职场人最讨厌的事情除了加班就是开会了。但很多人认为开会比加班更可恶&#xff0c;…

hackthebox—Sau

文章目录 1、信息收集2、ssrf3、命令执行 1、信息收集 fscan扫描ip发现存在22和55555&#xff0c;但是实际上这个fscan扫描的不全 再试试nmap nmap -sV -sC -sT -v -T4 10.10.11.224 有三个端口&#xff0c;其中80应该是只能内网访问&#xff0c;看来需要借助ssrf了。 2、s…

Windows Server 2022 中文版、英文版下载 (updated Jul 2023)

Windows Server 2022 中文版、英文版下载 (updated Jul 2023) Windows Server 2022 正式版&#xff0c;2023 年 7 月更新 请访问原文链接&#xff1a;https://sysin.org/blog/windows-server-2022/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&a…

华为OD机试真题 Java 实现【文件目录大小】【2023 B卷 100分】,附详细解题思路

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明4、再输入5、再输出6、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题…

Python内置函数系统学习(1)——数据转换与计算(详细语法参考 + 参数说明 + 具体示例)

世界上最重要的东西就是纸&#xff0c;重至承载文明的崛起&#xff0c;轻至承载一个饱满厚实的人生。 &#x1f3af;作者主页&#xff1a; 追光者♂&#x1f525; &#x1f338;个人简介&#xff1a; &#x1f496;[1] 计算机专业硕士研究生&#x1f496; &#x1f31f;[2]…

最小生成树算法(Prim Kruskal)

目录 最小生成树算法总览最小生成树的定义及性质Prim&#xff08;普利姆&#xff09;算法1.朴素Prim算法算法步骤 2.堆优化Prim算法算法步骤 3.算法运用Prim算法求最小生成树流程实现朴素Prim的代码实现堆优化Prim的代码实现 Kruskal&#xff08;克鲁斯卡尔&#xff09;算法1.算…

IAR for STM8L标准库驱动ST7735 1.8‘‘LCD显示

IAR for STM8L标准库驱动ST7735 1.8’LCD显示 ✨STM8驱动ST7735 1.8’LCD屏幕的话&#xff0c;自己移植的话&#xff0c;可以参考stm32标准库驱动来移植&#xff0c;GPIO的操作方式和STM32标准库函数名都一致&#xff0c;移植起来改动量很少&#xff0c;这仅针对软件驱动方式。…

【Java基础教程】(四十六)IO篇 · 下:System类对IO的支持:错误输出、信息输出、系统输入,字符缓冲流、扫描流和对象序列化流~

Java基础教程之IO操作 下 &#x1f539;本节学习目标1️⃣ System类对 IO 的支持1.1 错误输出&#xff1a;System.err1.2 信息输出&#xff1a;System.out1.3 系统输入&#xff1a;System. in 2️⃣ 字符缓冲流&#xff1a;BufferedReader3️⃣ 扫描流&#xff1a;Scanner4️⃣…