RabbitMQ实现六类工作模式

news2024/11/27 22:25:44
😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: RabbitMQ实现六类工作模式
⏱️ @ 创作时间: 2023年07月20日

在这里插入图片描述

目录

  • 1、概述
  • 2、工具类封装
  • 3、简单列队模式
  • 4、工作列队模式
  • 5、发布/订阅模式
  • 6、路由模式
  • 7、主题模式
  • 8、交换机说明
  • 9、发布确认模式(消息确认机制)
    • 9.1、消费者实现
    • 9.2、单个确认发布(生产者实现)
    • 9.3、批量确认发布(生产者实现)
    • 9.4、异步确认发布(生产者实现)
    • 9.5、三类确认发布区别对比
  • 10、源码地址

1、概述

六模式:简单模式、工作模式、发布订阅模式、路由模式、主题模式、发布确认模式
参考代码:https://gitee.com/lhzlx/rabbit-simple-demo.git
在这里插入图片描述

2、工具类封装

在下面每种模式的笔记中,会进行代码演示,为了方便,进行工具类的封装,代码如下:

public class RabbitMqUtils {
    /**
     * 得到一个连接的  channel
     *
     * @return
     * @throws Exception
     */
    public static Channel getChannel() throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        return connection.createChannel();
    }
}

3、简单列队模式

生产者通过直接将消息发送到消费者
在这里插入图片描述

代码包路径: lhz.simple

public class Producer {
    /**
     * 设置队列名称
     */
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();
        /*
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化   默认消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费   是否进行共享   true可以多个消费者消费
         * 4.是否自动删除   最后一个消费者端开连接以后   该队列是否自动删除   true 自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        /*
         * 发送一个消息
         * 1.发送到那个交换机(可以没有)
         * 2.路由的  key是哪个
         * 3.其他的参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("消息发送完毕");
    }
}

消费者:

public class Consumer {
    /**
     * 设置队列名称
     */
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        // 消费队列消息的一个回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消息消费成功,内容:");
            System.out.println(message);
        };
        // 取消消费的一个回调接口   如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
        /*
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答   true代表自动应答   false手动应答
         * 3.消费者未成功消费的回调
         * 3.消费者取消消费的的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("等待接收消息....");
    }
}

4、工作列队模式

工作队列: 用来将耗时的任务分发给多个消费者,并且一个消息只能被消费一次,默认情况下RabbitMQ 将按顺序将每条消息发送给下一个消费者==(即轮询分发)==。

主要解决问题: 处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。
在这里插入图片描述

代码包路径: lhz.work

public class Producer {
    /**
     * 设置队列名称
     */
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("发送消息完成:" + message);
            }
        }
    }
}

消费者:
消费者需要两个,分别为:Consumer01 Consumer02两者代码一致,只是名称不同,下面以Consumer01 代码为例:

// Consumer01与Consumer02代码一致
public class Consumer01 {
    /**
     * 设置队列名称
     */
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();
        // 消费队列消息的一个回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("Consumer01消息消费成功,内容:");
            System.out.println(message);
        };
        // 取消消费的一个回调接口   如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("Consumer01消息消费被中断");
        };
        
        //采用自动应答
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("Consumer01等待接收消息....");
    }
}

结果:
为了演示工作队列的轮询分发,需要启动两个Consumer实例,然后再启动Producer并且在控制台多次输入内容,可以看到两个Consumer会依次接收消息,结果如下:
在这里插入图片描述

5、发布/订阅模式

发布/订阅模式是:
生产者将消息发送到交换机中,由交换机发送给不同类型的消费者,做到发布一次,消费多个,如果消费者绑定的队列名称一样,将按照轮询进行消费,所以保证了:同一个队列的中的消息不会被重复消费;

比如:
它包含一个生产者、多个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。

在这里插入图片描述

代码包路径: lhz.fanout

public class Producer {

    /**
     * 定义交换机和队列名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        /*绑定的交换机 参数1交互机名称 参数2 exchange类型 */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        for (int i = 0; i < 10; i++) {
            String message = "消息:" + i;
            // 发送一个消息,1.发送到那个交换机,2.路由的  key是哪个,3.其他的参数信息,4.发送消息的消息体
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        }
        System.out.println("消息发送完毕");
    }
}

消费者:
消费者需要两个,分别为:Consumer01 Consumer02两者代码一致,只是名称不同,下面以Consumer01 代码为例:

/**
*两个消费者逻辑代码一样,只是绑定的队列不同,Consumer01:consumerFanout_sms;Consumer02:consumerFanout_email
*/
public class Consumer01(02) {
    /**
     * 设置队列及交换机名称
     */
    private static final String QUEUE_NAME = "consumerFanout_sms";
    private static final String QUEUE_NAME = "consumerFanout_email";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        //消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 消费队列消息的一个回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("Consumer01(02)消息消费成功,内容:");
            System.out.println(message);
        };
        // 取消消费的一个回调接口   如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("Consumer01(02)消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("Consumer01(02)等待接收消息....");
    }
}

结果:
先启动一次生产者,再启动两个消费者,绑定到交换机A,以及两个不同的队列、然后再重新启动生产者,绑定到交换机A;

在生产者发生消息以后,两个不同名称的消费者队列都可以接收到消息相同的内容,这些因为不同的消费者绑定了同一个交换机
注意:如果消费者绑定的队列名称一样,将按照轮询进行消费,所以保证了:同一个队列的中的消息不会被重复消费;**

6、路由模式

路由模式:
跟发布订阅模式类似,在订阅模式的基础上修改了exchange类型以及加上了路由键,如果消费者的路由键一样,其效果和发布/订阅模式一致订阅模式是分发到所有绑定到交换机的所有队列,路由模式只分发到绑定在交换机上面指定路由键的队列一个队列可以绑定多个不同的路由

注意: 消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;

我们可以看一下下面这张图:

在这里插入图片描述
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black、green的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

说明: 生产者发送消息到交换机,同时定义了一个路由 routingKey,多个消费者声明多个队列,与交换机进行绑定,同时定义路由 routingKey,只有和生产者发送消息时的路由 routingKey相同的消费者才能消费数据

注意: 如果交换机和路由绑定后,需要修改路由就要修改交换机名称

代码包路径:lhz.route

生产者:

public class Producer {

    /**
     * 定义交换机和队列名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        /*绑定的交换机 参数1交互机名称 参数2 exchange类型 */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 发送消息
        String message = "", sendType = "";
        for (int i = 0; i < 20; i++) {
            if (i % 2 == 0) {
                sendType = "info";
                message = "我是 info 级别的消息类型:" + i;
            } else {
                sendType = "error";
                message = "我是 error 级别的消息类型:" + i;
            }
            System.out.println("[send]:" + message + "  " + sendType);

            // 第二个参数就是路由键
            channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes());
        }
        System.out.println("消息发送完毕");
    }
}

消费者:
消费者需要两个,分别为:Consumer01 Consumer02两者代码一致,只是名称不同,下面以Consumer01 代码为例:

/**
* 两个消费者逻辑代码一样,只是绑定的队列不同和不同的路由键
* Consumer01:"info"、"consumer_info";Consumer02:"error"、"consumer_error";
*/
public class Consumer01(02) {
    /**
     * 设置队列及交换机名称
     */
    private static final String ROUTING_KEY = "info";
    private static final String ROUTING_KEY = "error";
    private static final String QUEUE_NAME = "consumer_info";
    private static final String QUEUE_NAME = "consumer_error";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        //消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // 消费队列消息的一个回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("Consumer01(02)消息消费成功,内容:");
            System.out.println(message);
        };
        // 取消消费的一个回调接口   如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("Consumer01(02)消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("Consumer01(02)等待接收消息....");
    }
}

结果:
先启动一次生产者,再启动两个消费者,绑定到交换机A,以及两个不同的队列和不同的路由键;最后重新启动生产者,绑定到交换机A;

7、主题模式

主题模式:
routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系

注意:消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;
要求:

  • 要求
    Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。

  • 分隔符

    “*(星号)”:可以代替一个单词

    “#(井号)”:可以替代零个或多个单词

  • 比如

    • 中间带 orange 带3个单词: *.orange.*
    • 最后一个词是 rabbit 的3 个单词:*.*.rabbit
    • 以 lazy开头的多个单词lazy.#

图示:
在这里插入图片描述

注意: 如果交换机和路由绑定后,需要修改路由就要修改交换机名称

代码包路径:lhz.toptic

生产者:

public class Producer {

    /**
     * 定义交换机和队列名称
     */
    private static final String EXCHANGE_NAME = "topic";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        /*绑定的交换机 参数1交互机名称 参数2 exchange类型 */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 定义路由key
        String routingKey = "mq.info.log";
        String message = "topic_exchange_msg:" + routingKey;
        System.out.println("[send] = " + message);
        // 发送消息

        // 第二个参数就是路由键
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("消息发送完毕");
    }
}

消费者:

public class Consumer {

    /**
     * 设置路由匹配规则
     */
    private static final String ROUTING_KEY = "#.log";
    /**
     * 设置队列及交换机名称
     */
    private static final String QUEUE_NAME = "topic_consumer";
    private static final String EXCHANGE_NAME = "topic";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();

        //消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // 消费队列消息的一个回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消息消费成功,内容:");
            System.out.println(message);
        };
        // 取消消费的一个回调接口   如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("等待接收消息....");
    }
}

结果:
先启动消费者,绑定到交换机A,通过通配符模糊匹配路由;再启动生产者,绑定到交换机A,设置具体的路由键;

8、交换机说明

在没有交换机的情况,生产者直接往队列发送消息,消费者绑定队列消费相消息,但是同一个队列中一个消息只会被消费一次,所以无法满足一个消息同时被多个消费者使用;

​交换机的作用就可以解决这个问题,一个交换机可以绑定多个不同的队列,一个队列绑定多个消费者;生产者将消息发送到交换机中,所有绑定了该交换机的队列都可以收到消息;所以生产者发送一次消息,可以被不同的队列**(消费者)**进行消费;当同一个队列中存在多个消费者时,消息不会被重复消费;

9、发布确认模式(消息确认机制)

概念: 生产者将信道设置成 确认(confirm) 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

优点: confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到ack之后,生产者可以通过回调方法来处理该ack消息;如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息;

使用: 发布确认模式默认是没有开启,生产者通过调用方法 confirmSelect实现开启
在这里插入图片描述

9.1、消费者实现

代码包路径: lhz.confirm

public class Consumer {
    
    //设置队列名称
    private final static String QUEUE_NAME = "confirm_queue";

    public static void main(String[] args) throws Exception {
        // 获取Channel
        Channel channel = RabbitMqUtils.getChannel();
        
        // 消费队列消息的一个回调接口
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消息消费成功,内容:");
            System.out.println(message);
        };
        // 取消消费的一个回调接口   如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
        
        // 消费者消费消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("等待接收消息....");
    }
}

9.2、单个确认发布(生产者实现)

单个确认发布:
是一种简单的确认方式,它是一种**同步确认发布** 的方式,也就是发布的消息只有被确认发布之后,后续的消息才能继续发布,通过waitForConfirmsOrDie(long outTime)方法,指定时间范围内(单位:毫秒)这个消息没有被确认那么它将抛出异常;通过waitForConfirms()对broker响应的消息进行确认;

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,当然对于某 些应用程序来说这可能已经足够了。

代码包路径: lhz.confirm
类:SingleProducer
说明: 确认发布的实现只是对生产者代码做修改,所以消费者代码不变,参考:《9.1、消费者实现》
步骤: 启动消费者,发送消息,然后观察耗时即可;

public class SingleProducer {

    // 设置队列名称
    private final static String QUEUE_NAME = "confirm_queue";
    // 发送消息数量
    private final static Integer MESSAGE_COUNT = 100;

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //开启发布确认
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                //服务端返回确认状态,如果 false或超时时间内未返回,生产者可以消息重发
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("消息发送成功");
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
        }
    }
}

9.3、批量确认发布(生产者实现)

批量确认发布:
与单个等待确认消息相比,会先发布一批消息然后一起确认 可以极大地提高吞吐量,它也是一种**同步确认发布** 的方式。

这种方式的缺点就是: 当消息发布出现问题时,不知道是哪个消息出现问题了;

代码包路径:lhz.confirm
类:BatchProducer
**说明:**确认发布的实现只是对生产者代码做修改,所以消费者代码不变,参考:《9.1、消费者实现》
步骤: 启动消费者,发送消息,然后观察耗时即可;

public class BatchProducer {

    // 设置队列名称
    private final static String QUEUE_NAME = "confirm_queue";
    // 发送消息数量
    private final static Integer MESSAGE_COUNT = 100;

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 开启发布确认
            channel.confirmSelect();
            // 批量确认消息大小
            int batchSize = 100;
            // 未确认消息个数
            int outstandingMessageCount = 0;
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                outstandingMessageCount++;
                // 达到设置的批处理大小时,进行确认
                if (outstandingMessageCount == batchSize) {
                    channel.waitForConfirms();
                    outstandingMessageCount = 0;
                }
            }
            // 为了确保还有剩余没有确认消息,进行再次确认
            if (outstandingMessageCount > 0) {
                channel.waitForConfirms();
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
        }
    }
}

9.4、异步确认发布(生产者实现)

异步确认发布:
虽然编程逻辑比上两个要复杂,但是可靠性和效率更高;他是利用回调函数来保证是否投递成功。逻辑图如下:
在这里插入图片描述

代码包路径:lhz.confirm
类:AsynProducer
**说明:**确认发布的实现只是对生产者代码做修改,所以消费者代码不变,参考:《9.1、消费者实现》
步骤: 启动消费者,发送消息,然后观察耗时即可;

public class AsynProducer {
    /**
     * 设置队列名称
     */
    private final static String QUEUE_NAME = "confirm_queue";

    /**
     * 发送消息数量
     */
    private final static Integer MESSAGE_COUNT = 500;

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //开启发布确认
            channel.confirmSelect();
            /*
             * 线程安全有序的一个哈希表,适用于高并发的情况
             * 1.轻松的将序号与消息进行关联
             * 2.轻松批量删除条目    只要给到序列号
             * 3.支持并发访问
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap();

            /*
             * 确认收到消息的一个回调
             * 1.消息序列号
             * 2.true可以确认小于等于当前序列号的消息
             *   false确认当前序列号消息
             */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //返回的是小于等于当前序列号的未确认消息    是一个  map
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
                    //清除该部分未确认消息
                    confirmed.clear();
                } else {
                    //只清除当前序列号的消息
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            // 未被确认消息回调
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("未被确认消息:" + message + ",序列号" + sequenceNumber);
            };
            /*
             * 添加一个异步确认的监听器
             * 1.确认收到消息的回调
             * 2.未收到消息的回调
             */
            channel.addConfirmListener(ackCallback, nackCallback);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                /*
                 * channel.getNextPublishSeqNo()获取下一个消息的序列号
                 * 通过序列号与消息体进行一个关联
                 * 全部都是未确认的消息体
                 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
        }
    }
}

9.5、三类确认发布区别对比

  • 单独发布消息
    同步等待确认,简单,但吞吐量非常有限,较耗时。

  • 批量发布消息
    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题,效率较高。

  • 异步处理:
    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些,效率高。

10、源码地址

源码地址:https://gitee.com/lhzlx/rabbit-simple-demo.git

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/764072.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jenkins持续集成自动化测试

目录 执行集成构建 持续&#xff0c;自动地构建&测试软件项目代码管理&#xff08;git/svn&#xff09;>编译&#xff08;maven/ant/gradle&#xff09;>打包>测试环境部署>自动化测试 研发体系中的迭代流程 1 源码分支管理&#xff1a; git或者svn, 将不同开…

C++基础与深度解析02——

0. 前言 接上文C基础与深度解析01&#xff0c;本篇主要介绍C的输入输出流&#xff0c;如下 1. 基础概念 1.1头文件 通常&#xff0c;在一个 C 程序中&#xff0c;只包含两类文件—— .cpp 文件和 .h 文件。其中&#xff0c;.cpp 文件被称作 C 源文件&#xff0c;里面放的都是…

【C++】STL——vector的有关空间的函数介绍和使用、size和capacity函数、resize和reserve函数

文章目录 1.vector的使用2.vector空间增长问题&#xff08;1&#xff09;size 获取数据个数&#xff08;2&#xff09;capacity 获取容量大小&#xff08;3&#xff09;empty 判断是否为空&#xff08;4&#xff09;resize 改变vector的size&#xff08;5&#xff09;reserve 改…

Alvas.Audio v2019 Crack

Alvas.Audio v2019 Crack 该库使C#和VB.Net程序员能够创建执行&#xff08;包括混合声音信息&#xff09;、捕获、转换和编辑音频的应用程序。 阿尔瓦斯。音频是C#音乐库。网络程序员。 这使你能够生产。NET程序&#xff0c;例如Winforms/WPF/Windows服务/控制台录音机、Int…

❤️创意网页:使用CSS和HTML创建令人惊叹的3D立方体

✨博主&#xff1a;命运之光 &#x1f338;专栏&#xff1a;Python星辰秘典 &#x1f433;专栏&#xff1a;web开发&#xff08;简单好用又好看&#xff09; ❤️专栏&#xff1a;Java经典程序设计 ☀️博主的其他文章&#xff1a;点击进入博主的主页 前言&#xff1a;欢迎踏入…

经典文献阅读之--SRIF-based LiDAR-IMU Localization(SRIF的LiDAR-IMU自动驾驶鲁棒定位)

0. 简介 对于车辆来说&#xff0c;我们更希望能够得到一个有效的定位系统&#xff0c;能够保证高精度的同时&#xff0c;拥有较高的鲁棒性&#xff0c;而《Robust SRIF-based LiDAR-IMU Localization for Autonomous Vehicles》就是这样一篇文章&#xff0c;在各种场景中实现了…

起名大师,支持多种取名方式,根据自己的喜好去选择

软件功能&#xff1a; 1.参考宝宝姓氏、性别、生辰八字、天格、地格辅助用户为宝宝取名。 2.一次可生产数千个好名字&#xff0c;您还可根据笔画数、拼音、五行等筛选喜欢的名字。 3.提供10余种方法供起名选择&#xff0c;比如指定取名&#xff0c;谐音取名&#xff0c;生日取…

【百度】判断ip地址是否合法

在LeetCode上没有看到这个题目&#xff0c;加上对String的API记得不清楚&#xff0c;导致这个题目没有写得很好&#xff0c;许愿面试官能够仁慈一点 一个合法的ip地址应该有&#xff1a; 三个点将字符串划分为4个数字数字的大小[0,255]&#xff0c;且数字不能为空 合理应用St…

mycat 垂直分库与水平分表使用详解

说明 在了解mycat的常用分片规则之前,有必要再对涉及到分片规则相关的几个配置文件做深入的了解,包括:schema.xml,server.xml,rule.xml等, 其中最核心的schema.xml文件是配置分片规则的入口文件,有必要对该配置文件中的关键参数做了解,且看下面这幅图,回顾下里面的配置…

【C++】二叉搜索树KV模型

最典型的一个场景&#xff0c;自动翻译软件&#xff0c;输入中文&#xff0c;输出对应的英文&#xff0c;输入英文&#xff0c;输出对应的中文。 可以用一颗搜索二叉树来实现这一功能。 K->key V->val 基础结构和普通搜索二叉树保持一致&#xff0c;只是成员多了一个_val…

关于Tab制表符,点击一次跳很多字符的问题解决

首先在出现问题的地方右键鼠标&#xff0c;出现后点击段落。 进入后点击左下角的制表位 进入后点击全部清除&#xff0c;然后确认&#xff0c;问题就解决了&#xff08;哪里有问题就处理哪里&#xff09;

天纵竞赛系统助力江苏省“苏小登杯”不动产登记技能竞赛暨首届全国赛省级选拔赛

7月14日&#xff0c;第四届江苏省“苏小登杯”不动产登记技能竞赛暨首届全国赛省级选拔赛在苏州广播电视总成功举办。天纵竞赛系统提供核心软件技术及其配套硬件支持。 本次竞赛由江苏全省13支队伍、52名一线不动产登记人员参加比赛&#xff0c;竞赛环节包括笔试、现场竞答、代…

性能测试 —— JMeter分布式测试及其详细步骤

性能测试概要 性能测试是软件测试中的一种&#xff0c;它可以衡量系统的稳定性、扩展性、可靠性、速度和资源使用。它可以发现性能瓶颈&#xff0c;确保能满足业务需求。很多系统都需要做性能测试&#xff0c;如Web应用、数据库和操作系统等。 性能测试种类非常多&#xff0c…

windows操作小技巧1:文件批操作更改类型

今日更新一个Windows操作小技巧: 日常生活中我们有批量操作更改文件后缀名&#xff08;类型&#xff09;的需要&#xff1a; 比如这有五个.txt文本文件&#xff0c;我要想将其批量改为.html该如何操作呢&#xff1f; 首先新建文本文档&#xff1a; 其次在新建的文本文档输入以…

B 端软件:常见知识梳理

前言 我一直从事企业级软件研发工作&#xff0c;也就是我们通常称之为 B 端软件。近年来&#xff0c;我的工作重心主要在研发低代码平台和 aPaaS 平台&#xff0c;这使我对 B 端软件有了更深入的理解。 和 B 端软件对应的就是我们熟悉的 C 端软件&#xff0c;我们手机中安装的那…

VsCode添加Vue模版代码片段

文章目录 VsCode添加Vue模版代码片段1. 复制一段已有要制作模板的Vue代码&#xff0c;比如&#xff1a;2. 粘贴到下方链接的工具网站&#xff0c;可自动生成模板代码的片段3. VsCode中设置代码片段3-1 打开菜单&#xff1a;首选项-用户片段3-2 出现如下的搜索栏&#xff0c;搜索…

简易注册中心监控NAS断电断网

日常使用NAS过程中&#xff0c;偶尔会出现家里断电或者断网的情况&#xff0c;NAS自带网络断开的通知功能&#xff0c;但需要是恢复网络链接后才会通知&#xff0c;而此时都恢复了&#xff0c;再通知也就没那么重要&#xff0c;还有断电情况下也是需要回家才能知道&#xff0c;…

opencv -12 图像运算之按 《位或》 运算(图像融合图像修复和去除)

位或运算 或运算的规则是&#xff0c;当参与或运算的两个逻辑值中有一个为真时&#xff0c;结果就为真。其逻辑关系可以类比为如图 所示的并联电路&#xff0c;两个开关中只要有任意一个闭合时&#xff0c;灯就会亮。 3-5 对参与或运算的算子的不同情况进行了说明&#xff0c;…

浅谈设计模式之工厂模式

0 工厂模式的介绍 工厂模式属于创建型模式&#xff0c;是Java中最常见的设计模式之一。工厂模式提供了一种将对象的实例化过程封装在工厂类的方式&#xff0c;通过工厂模式可以屏蔽掉对象的创建过程&#xff0c;对外提供了一种统一的接口来创建不同类型的对象。 优点&#xf…

数据结构01-线性结构-链表栈队列-队列篇

文章目录 参考&#xff1a;总结大纲要求线性结构-队列QQ号码解密 参考&#xff1a; 总结 本系列为C数据结构系列&#xff0c;会介绍 线性结构&#xff0c;简单树&#xff0c;特殊树&#xff0c;简单图等。本文为线性结构部分。 大纲要求 线性结构 【 3 】链表&#xff1a;单…