RabbitMQ 的工作模式

news2025/1/15 3:58:49

目录

工作模式

Simple(简单模式)

Work Queue(工作队列)

Publish/Subscribe(发布/订阅)

Exchange(交换机) 

Routing(路由模式)

Topics(通配符模式)

RPC(RPC通信)

Publisher Confirms(发布确认)

代码实现

Simple(简单模式)

生产者代码

消费者代码

Work Queues(工作队列)

生产者代码

消费者代码

Publish/Subscribe(发布/订阅)

生产者代码

消费者代码

Routing(路由模式)

生产者代码

消费者代码

Topics(通配符模式)

生产者代码

消费者代码

RPC(RPC通信)

客户端代码

服务端代码

Publisher Confirms(发布确认)

Publishing Messages Individually(单独确认)

Publishing Messages in Batches(批量确认)

Handling Publisher Confirms Asynchronously(异步确认)


RabbitMQ 共提供了 7 种工作模式进行消息传递:

在本篇文章中,我们就来学习 RabbitMQ 的工作模式,我们首先来了解这 7 种工作模式分别是怎样的

工作模式

Simple(简单模式)

P 表示 生产者,是消息的发送方

C 表示 消费者,是消息的接收者

Queue:表示 消息队列,用于缓存消息,生产者生产的消息发送到队列中,消费者从队列中取出消息

简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列中后,都由这个消费者消费

特点:一个生产者 P,一个消费者 C,消息只能被消费一次,也称为 点对点(Point-to-Point)模式

适用场景:消息只能被单个消费者处理

在 RabbitMQ 入门中的入门代码的工作模式就是简单模式

Work Queue(工作队列)

此时有 一个生产者和多个消费者

当生产者向队列中发送多条消息后,Work Queue 会将消息分配给不同的消费者,每个消费者接收到的消息不同,由多个消费者共同消费生产者生产的消息

例如:

由 A (生产者)发送不同消息,消息存储到 RabbitMQ 中,接着,由 B(消费者1) 和 C(消费者2) 共同消息 A 发送的消息,此时,RabbitMQ 选择将第一条消息分配给 B,B 消费第一条消息,RabbitMQ 将第二条消息分配给 C,C 消费第二条消息......

B 和 C 接收到的消息是不同的,这两个消费者共同消费 A 发送的所有消息

特点:消息不会重复,分配给不同的消费者

适用场景:集群环境中实现异步处理

Publish/Subscribe(发布/订阅)

其中,X 表示的是 交换机,在 发布/订阅 模式中,多了 Exchange 角色,因此,我们先来学习交换机相关知识

Exchange(交换机) 

Exchange(交换机)的作用是 接收生产者发送的消息,并将消息按照一定的规则路由到一个或多个队列中

生产者的消息都会先发送到交换机,然后再由交换机将消息路由到队列中

在前面 简单模式 工作队列模式 下,图中都没有出现交换机,但实际上,生产者生产的消息都是先发送到交换机,然后再路由到队列中的。在前两种模式下,直接使用 RabbitMQ 提供的内置交换机就可以实现,因此,并没有突出交换机的存在,但实际上生产者生产的消息不会直接投递到队列中

在 RabbitMQ 中,交换机有 4 种类型:FanoutDirectTopic Headers,不同的类型有着不同的路由策略

AMQP 协议中还有两种类型,System 自定义,在这里,我们并不重点关注

Fanout:广播,将消息交给所有绑定到交换机的队列(Publish / Subscribe 模式

Direct:定向,将消息交给符合指定 routing key 的队列(Routing 模式

Topic:通配符,将消息交给符合 routing patterm(路由模式)的队列(Topics 模式

Headers:Headers 类型的交换机通过消息头部的属性来路由消息,而不依赖路由键的匹配规则来路由消息。根据发送的消息内容中的 headers 属性进行匹配,headers 类型的交换机性能会较差,因此也不太实用,基本上也不会进行使用

Exchange(交换机)只负责转发消息,并不具备存储消息的能力,因此,若是没有任何队列与 Exchange 绑定,或是没有符合路由规则的队列,消息就会丢失

接下来,我们来看 RoutingKey BindingKey

RoutingKey:路由键,当生产者将消息发送给交换机时,会指定一个字符串,用于告诉交换机如何处理这个消息

BindingKey:绑定,RabbitMQ 中通过 Binding(绑定)将交换机与队列关联起来,在绑定时会指定一个 Binding Key,这样 RabbitMQ 就知道如何正确地将消息路由到队列

即,绑定时,需要的路由键是 BindingKey;发送消息时,需要的路由键是 RoutingKey

例如:

使用 BindingKey1 将交换机与 队列1 进行绑定,使用 BindingKey2 将交换机与 队列2 进行绑定

若在发送消息时,若设置 Routing Key 设置为 BindingKey1,交换机就会将消息路由到 队列1

即,当消息的 RoutingKey 与队列绑定的 BindingKey 相匹配时,消息才会被路由到这个队列中

其实,BindingKey 也属于路由键的一种,即,在绑定时使用的路由键,有时,也会使用 RoutingKey 表示 BindingKey,即使用 RoutingKey 表示 BindingKey 和 RoutingKey,因此,我们需要根据其使用场景进行区分

在了解了相关概念之后,我们继续看 Publish/ Subscribe 模式

上述有一个生产者 P,多个消费者 C1、C2,X 表示交换机,交换机将消息复制多份每个消费者接收到相同的消息

也就是说,生产者发送一条消息,经过交换机转发到不同的队列,不同的消费者从不同的队列中取出消息进行消费

特点:不同的消费者接收到的消息是相同的

适用场景:消息需要被多个消费者同时接收,如:实时通信或广播消息

Publish/Subscribe(发布/订阅)模式 与 Work Queue(工作队列)模式 最大的区别就是:发布/订阅 模式下,不同消费者接收到的消息是相同的;而 工作队列 模式下,不同消费者接收到的消息是不同的

Routing(路由模式)

路由模式可以看做是 发布订阅模式 的变种,其在发布订阅模式的基础上,增加了路由 key

发布订阅模式会无条件的将所有消息发送给所有消费者,而路由模式下,交换机会根据 RoutingKey 的规则,将数据筛选后发送给对应的消费者队列

也就是说,只有满足条件的队列才会收到消息

如上图所示,Q1 通过 a 与交换机进行绑定,Q2 通过 a、b 和 c 与交换机进行绑定

当 P (生产者)在发送消息时,若设置 Routing Key 设置为 a,则此时 Q1 和 Q2 的 BindingKey 都与其相匹配,消息就会被路由到 Q1 和 Q2 中

而当 P 发送消息时,设置 Routing Key 设置为 b,此时,只有 Q2 的 BindingKey 与其相匹配,消息也就只会被路由到 Q2 中

适用场景:需要根据特定规则分发消息

Topics(通配符模式)

通配符模式,则是 路由模式 的变种,在 RoutingKey 的基础上,增加了 通配符 的功能,使得匹配更加灵活

Topics 和 Routing 的基本原理相同,即:生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列

而不同的是,Routing 模式下,需要 RoutingKey 和 BingingKey 完全匹配;而 Topics 模式下,则是通配符匹配

在 BindingKey 中,存在两种特殊的字符串,用于模糊匹配

* :表示能够匹配任意一个单词

#:表示能够匹配任意多个单词(可以为 0 个)

Q1 通过 *.a.* 与交换机进行绑定,Q2 通过 *.*.b 和 c.# 与交换机进行绑定

当 P (生产者)在发送消息时,若设置 Routing Key 设置为 work.a.b,则此时 Q1 和 Q2 的 BindingKey 都能够与其相匹配,消息就会被路由到 Q1 和 Q2 中

而当 P 发送消息时,设置 Routing Key 设置为 a.a.a,此时,只有 Q1 的 BindingKey 与其相匹配,消息也就只会被路由到 Q1 中

适用场景:需要灵活匹配和过滤消息的场景

RPC(RPC通信)

RPC 通信过程中,没有生产者和消费者,而是通过两个队列实现了一个可回调的过程

例如:

客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2...),这个回调队列用于接收服务端的响应消息

服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2...)

客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是所期望的响应

简而言之,客户端将请求消息发送到 队列Q1 中,服务器从 Q1 中取出请求消息进行处理,然后将响应消息发送到 队列Q2 中,客户端从 Q2 中读取响应消息

从而实现了 客户端向服务器发送请求,服务器返回对应的响应 的功能

Publisher Confirms(发布确认)

Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理

其过程为:

(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些 序列号与消息关联 起来,以便追踪消息的状态

(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达

通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失

适用场景:对数据安全性要求较高,如金融交易,订单处理等

在基本了解了 RabbitMQ 的 7 种工作模式后,我们就来通过代码简单实现一下这 7 种工作模式

代码实现

Simple(简单模式)

简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列后,都由这个消费者消费

在  RabbitMQ入门-CSDN博客 中的入门代码的工作模式就是简单模式,因此,在这里就不再进行过多解释了

首先引入依赖:

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>

生产者代码

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("49.232.238.62"); // ip 的默认值为 localhost
        factory.setPort(5672); // 默认值为 5672
        factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername("admin"); // 用户名,默认为 guest
        factory.setPassword("123456"); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare("simple.test", true, false, false, null);
        // 6. 通过 channel 发送消息到队列中
        String message = "test...";
        channel.basicPublish("", "simple.test", null, message.getBytes());
        System.out.println("消息:" + message + " 发送成功");
        // 7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("49.232.238.62"); // ip 的默认值为 localhost
        factory.setPort(5672); // 默认值为 5672
        factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername("admin"); // 用户名,默认为 guest
        factory.setPassword("123456"); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare("simple.test", true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume("simple.test", true, consumer);
        // 7. 释放资源
        channel.close();
        connection.close();
    }
}

Work Queues(工作队列)

生产者代码

工作队列模式下,由一个生产者生产消息,多个消费者共同接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收

由于我们每次连接时都要使用 IP、端口号、虚拟主机名等,因此,我们可以将它们提取出来,放到 Constants 类中:

public class Constants {
    public static final String HOST = "49.232.238.62";
    public static final int PORT = 5672;
    public static final String VIRTUAL_HOST = "test01";
    public static final String USER_NAME = "admin";
    public static final String USER_PASSWORD = "123456";
}

声明 工作队列 模式下使用的队列:

    // 工作模式
    public static final String WORK_QUEUE = "work.queue";

接下来,我们就来实现生产者的代码:

工作队列模式与简单模式的区别在于工作模式下有多个消费者,因此生产者的消费代码与简单模式下差别不大,但在发送消息时,我们一次发送 20 条消息:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        // 6. 通过 channel 发送消息到队列中
        for (int i = 0; i < 20; i++) {
            String message = "work test... " + i;
            channel.basicPublish("", Constants.WORK_QUEUE, null, message.getBytes());
        }
        System.out.println("消息发送成功!");
        // 7. 释放资源
        channel.close();
        connection.close();
    }
}

运行代码,可以看到 work.queue 队列被创建,且存储了 20 条消息

 接下来,我们继续编写消费者代码

消费者代码

消费者的代码也与简单模式下的代码差别不大,但在最后,我们并不进行资源的释放:

Consumer1:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
    }
}

Consumer2:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
    }
}

启动 Consumer1 和 Consumer2:

由于我们之前先启动了生产者,此时再启动消费者,由于消息较少,因此,先启动的 Consumer1 会瞬间将 20 条消息消费掉

因此,再次启动 Producer,观察结果:

可以看到两个消费者分别消费了 10 条消息

Publish/Subscribe(发布/订阅)

生产者代码

发布/订阅 模式中,多了 Exchange 角色

Exchange 常见有三种类型,分别代表不同的路由规则:

Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

Direct:定向,将消息交给符合指定 RoutingKey 的队列(Routing 模式)

Topics:通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)

此时,在 发布/订阅 模式下,我们就需要 声明交换机,并绑定队列和交换机

我们首先来看声明交换机

使用 channel.exchangeDeclare 方法来创建交换机,我们来看 exchangeDeclare 方法:

Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;

参数: 

exchange:交换机名称

type:交换机类型

durable:是否持久化,当为 true 时,会将交换机存盘,在服务器重启时不会丢失相关信息

autoDelete:是否自动删除,自动删除的前提是至少有一个队列或交换机与这个交换机绑定,之后与这个交换机绑定的队列或交换机都会与此解绑

internal:是否为内部使用,若设置为 true,则表示内部使用,客户端无法直接发送消息到这个交换机中,只能通过交换机路由到交换机这种方式

arguments:相关参数

其中,type 表示交换机类型,其类型为 BuiltinExchangeType,也可以为 String

我们来看  BuiltinExchangeType,它是一个枚举类型

DIRECT("direct"):定向,直连,将消息交给符合指定 RoutingKey 的队列(Routing 模式)

FANOUT("fanout"):扇形,广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

TOPIC("topic"):通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)

HEADERS("headers"):参数模式(较少使用)

返回值:

Exchange.DeclareOk:声明确认方法,用于指示已成功声明交换

 在 Constants 类中定义 发布/订阅 模式下使用的交换机和两个队列:

    // 广播模式
    public static final String PUBLISH_CHANGE = "fanout";
    public static final String PUBLISH_QUEUE_1 = "publish.queue.1";
    public static final String PUBLISH_QUEUE_2 = "publish.queue.2";

建立连接,并声明交换机和两个队列:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明交换机
        channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
        // 6. 声明队列
        channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);
    }
}

交换机的类型为 BuiltinExchangeType.FANOUT 广播模式

接着,我们使用 channel.queueBind 方法将队列和交换机进行绑定:

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

queue:要绑定的队列名称

exchange:要绑定的交换机名称

routingKey:路由 key,路由规则

arguments:相关参数

在这里的 routingKey,其实就是 BindingKey将交换机与队列关联起来,从而让 RabbitMQ 知道如何正确地将消息路由到队列

在发布/订阅模式下,交换机类型为 fanoutroutingKey 设置为 "",表示每个消费者都可以收到全部信息

        // 7. 绑定交换机和队列
        channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);
        channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);

接下来,就可以发送消息了:

        // 8. 发送消息
        for (int i = 0; i < 20; i++) {
            String message = "work test... " + i;
            channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());
        }
        System.out.println("消息发送成功!");
        // 9. 释放资源
        channel.close();
        connection.close();

完整代码:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明交换机
        channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
        // 6. 声明队列
        channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);
        // 7. 绑定交换机和队列
        channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);
        channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);
        // 8. 发送消息
        for (int i = 0; i < 20; i++) {
            String message = "work test... " + i;
            channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());
        }
        System.out.println("消息发送成功!");
        // 9. 释放资源
        channel.close();
        connection.close();
    }
}

运行代码,并观察结果:

可以看到,publish.queue.1 和 publish.queue.2 中都已经存储了 20 条消息

查看 fanout 的绑定关系:

成功绑定 publish.queue.1 和 publish.queue.2:

接下来,我们继续编写消费者代码

消费者代码

交换机和队列的绑定关系已经在生产者中实现了,因此,消费者代码中可以不必再写

其实现与 工作队列模式 下是基本相同的,只需要修改读取的队列即可

Consumer1:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.PUBLISH_QUEUE_1, true, consumer);
    }
}

Consumer2:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.PUBLISH_QUEUE_2, true, consumer);
    }
}

运行 Consumer1 和 Consumer2:

Consumer1 和 Consumer2 都接收到了这 20 条消息

Routing(路由模式)

生产者代码

Routing 模式下,队列与交换机之间的绑定,不再是任意的绑定了,而是需要指定一个 BindingKey

生产者在向 交换机 发送消息时,也需要指定消息的 RoutingKey

交换机不会将消息发送给每一个绑定的 key,而是会根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致时,才会接收消息

先在 Constants 类中定义 路由模式下使用的交换机和队列:

    // 路由模式
    public static final String ROUTING_CHANGE = "routing";
    public static final String ROUTINT_QUEUE_1 = "routing.queue.1";
    public static final String ROUTINT_QUEUE_2 = "routing.queue.2";

路由模式下,生产者的代码与 发布/订阅模式 下的区别在于:交换机的类型不同 以及 绑定队列的 BindKey 不同

(1)交换机类型不同

在声明交换机时,交换机的类型为 BuiltinExchangeType.DIRECT

        // 5. 声明交换机
        channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);

(2)声明队列

        // 6. 声明队列
        channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);

(3)交换机与队列的绑定方式不同

        // 7. 绑定交换机和队列
        channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);
        channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);
        channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);
        channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);

此时,我们就可以发送消息了

在发送消息时,需要指定 RoutingKey

        // 8. 发送消息
        String messageA = "test a...";
        channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());
        String messageB = "test b...";
        channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());
        String messageC = "test c... ";
        channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());
        System.out.println("消息发送成功!");

完整代码:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明交换机
        channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 6. 声明队列
        channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);
        // 7. 绑定交换机和队列
        channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);
        channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);
        channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);
        channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);
        // 8. 发送消息
        String messageA = "test a...";
        channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());
        String messageB = "test b...";
        channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());
        String messageC = "test c... ";
        channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());
        System.out.println("消息发送成功!");
        // 9. 释放资源
        channel.close();
        connection.close();
    }
}

运行代码,并观察结果:

 routing.queue.1 中有 1 条消息,routing.queue.2 中有两条消息

查看 routing 交换机与 队列的绑定关系:

接下来,我们继续编写消费者的代码 

消费者代码

消费者代码与 发布/订阅 模式下基本相同,只需要修改队列名称即可:

Consumer1:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1 成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.ROUTINT_QUEUE_1, true, consumer);
    }
}

Consumer2:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2 成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.ROUTINT_QUEUE_2, true, consumer);
    }
}

运行结果:

Topics(通配符模式)

生产者代码

相比于 routing 模式,topics 类型的交换机在匹配规则上进行了扩展,BindingKey 支持通配符匹配

其中,RoutingKey 是一系列由 . 分割的单词,如 user.name、work.abc等

BindingKey 也和 RoutingKey 一样,由 . 分割的字符串

在 BindingKey 中可以存在两种特殊的字符串,用于模糊匹配:

* :表示能够匹配任意一个单词

#:表示能够匹配任意多个单词(可以为 0 个)

例如:

交换机 与 队列1(Q1)的 BindingKey 为 *.a.*

交换机 与 队列2(Q2)的 BindingKey 为 *.*.b

交换机 与 队列2(Q2)的 BindingKey 为 c.#

则:

若生产者的 RoutingKey 为 work.a.b,则消息会被路由到 Q1 和 Q2

若生产者的 RoutingKey 为 a.a.a,则消息会被路由到 Q1

若生产者的 RoutingKey 为 c.work.a,则消息会被路由到 Q2

若生产者的 RoutingKey 为 b.c.g,则消息会被丢弃,或是返回给生产者(需要设置 mandatory 参数)

接下来,我们就来实现 通配符模式下 的生产者:

先在 Constants 类中定义通配符模式下使用的交换机和队列:

    // 通配符模式
    public static final String TOPICS_CHANGE = "topics";
    public static final String TOPICS_QUEUE_1 = "topics.queue.1";
    public static final String TOPICS_QUEUE_2 = "topics.queue.2";

与 路由模式相比,发布订阅模式与其区别为:交换机类型不同 以及 绑定队列的 RoutingKey 不同

(1)交换机类型不同

交换机的类型为 BuiltinExchangeType.TOPIC

        // 5. 声明交换机
        channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);


(2)声明队列

        // 6. 声明队列
        channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);

(3)交换机与队列的绑定方式不同

        // 7. 绑定交换机和队列
        channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);
        channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);
        channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);

此时,我们就可以发送消息了

在发送消息时,需要指定 RoutingKey

        // 8. 发送消息
        String message1 = "test work.a.b";
        channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());
        String message2 = "test a.a.a";
        channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());
        String message3 = "test c.work.a";
        channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());
        System.out.println("消息发送成功!");

完整代码:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明交换机
        channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
        // 6. 声明队列
        channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);
        // 7. 绑定交换机和队列
        channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);
        channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);
        channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);
        // 8. 发送消息
        String message1 = "test work.a.b";
        channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());
        String message2 = "test a.a.a";
        channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());
        String message3 = "test c.work.a";
        channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());
        System.out.println("消息发送成功!");
        // 9. 释放资源
        channel.close();
        connection.close();
    }
}

 运行并观察结果:

topics.queue.1 和 topics.queue.2 中都已经存储了两条消息

我们来看 topics.queue.1 中的消息:

我们继续实现消费者代码

消费者代码

Topics 模式的消费者代码与 Routing 模式下相同,只需要修改消费的队列名称即可:

Consumer1:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1 成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPICS_QUEUE_1, true, consumer);
    }
}

Consumer2:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 5. 声明队列
        channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);
        // 6. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当接收到消息后,自动执行该方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1 成功接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPICS_QUEUE_2, true, consumer);
    }
}

运行 Consumer1 和 Consumer2,并观察结果:

RPC(RPC通信)

RPC(Remote Procedure Call),远程过程调用,是一种通过网络从远程计算机上请求服务,不需要了解底层网络的技术,类似于 Http 远程调用

RabbitMQ 实现 RPC 通信,是通过两个队列实现一个可回调的过程:

其过程为:

客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2...),这个回调队列用于接收服务端的响应消息

服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2...)

客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是否是所期望的响应

接下来,我们就来实现 RPC 的客户端:

客户端代码

客户端主要实现的功能有:

1. 发送请求消息到队列中

2. 从回调队列中读取响应消息

我们先来看发送请求消息到队列的过程: 

(1)声明两个队列:消息发送到的队列(Queue) 和 回调队列(replayQueue),并声明本次请求的唯一标志 corrId

(2)将 replayQueue 和 corrId 配置到  Queue 中

接下来,需要从回调队列中读取响应消息,若我们直接从回调队列中读取响应消息,此时,可能服务端还没有处理完请求,也就未将响应消息发送到回调队列中,就读取不到响应

因此,我们可以使用阻塞队列来监听回调队列中的消息

1. 使用阻塞队列阻塞当前进程,监听回调队列中的消息,回调队列中有消息时,将响应消息放到阻塞队列中

2. 阻塞队列中有消息后,主线程被唤醒,处理返回内容

先在 Constants 类中声明 RPC 模式下使用的两个队列:

    // RPC 模式
    public static final String RPC_QUEUE_1 = "rpc.queue1";
    public static final String RPC_QUEUE_2 = "rpc.queue2";

在这里,我们就不再声明交换机了,直接使用默认的交换机

声明 消息发送的队列 和 回调队列:

        // 声明队列
        channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);

使用 UUID 生成本次请求的唯一标志,并配置消息属性:

        // 本次请求的唯一标识
        String corrId = UUID.randomUUID().toString();

消息相关配置的类型为 BasicProperties,位于 com.rabbitmq.client.AMQP 下:

 AMQP.BasicProperties 提供了一个构造器,可以通过 builder() 来设置一些属性:

使用 correlationId 方法设置唯一标识,replyTo 方法设置回调队列:

        // 本次请求的唯一标识
        String corrId = UUID.randomUUID().toString();
        // 消息属性
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                .builder()
                .correlationId(corrId) // 唯一ID
                .replyTo(Constants.RPC_QUEUE_2) // 回调队列
                .build();

最后调用 build 方法创建实例 

使用内置交换机发送消息:

        // 7. 发送消息
        String message = "test rpc...";
        channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());

接着,使用阻塞队列存储回调结果:

        // 阻塞队列,存放回调结果,一次获取一条消息
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);

 从回调队列中接收响应消息:

        // 8. 接收服务器的响应
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到回调消息: " + new String(body));
                // 判断标识是否正确
                if(corrId.equals(properties.getCorrelationId())) {
                    queue.offer(new String(body, "UTF-8"));
                }
            }
        };
        channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);

 最后,从阻塞队列中获取响应消息:

        // 9. 获取响应消息
        String result = queue.take();
        System.out.println("result: " + result);

完整代码:

public class Client {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 使用默认的交换机
        // 5. 声明队列
        channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);
        // 6. 设置消息属性
        // 本次请求的唯一标识
        String corrId = UUID.randomUUID().toString();
        // 消息属性
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                .builder()
                .correlationId(corrId) // 唯一ID
                .replyTo(Constants.RPC_QUEUE_2) // 回调队列
                .build();
        // 7. 发送消息
        String message = "test rpc...";
        channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());
        // 阻塞队列,存放回调结果,一次获取一条消息
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
        // 8. 接收服务器的响应
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到回调消息: " + new String(body));
                // 判断标识是否正确
                if(corrId.equals(properties.getCorrelationId())) {
                    queue.offer(new String(body, "UTF-8"));
                }
            }
        };
        channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);
        // 9. 获取响应消息
        String result = queue.take();
        System.out.println("result: " + result);
    }
}

我们继续编写服务端代码

服务端代码

服务端要实现的功能为:

1. 从队列中接收请求消息

2. 根据消息内容处理请求消息,并将响应消息返回到回调队列中

我们先来实现接收消息:

建立连接、声明队列等过程都与 客户端代码相同

但需要注意的是,我们需要设置服务端同时最多只能获取一条消息

        // 6. 设置同时最多只能获取一条消息
        channel.basicQos(1);

若不设置 basicQos,RabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0,当 prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前网络状况等因素,可能同时发送多条消息给消费者。这也就意味着,在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有波动

而在 RPC 模式下,通常希望是一对一的消息处理,即,一个请求对应一个响应。服务端在处理完一个消息并确认后,才会接收到下一条消息

接收消息后,就可以对请求消息进行处理并返回响应结果了:

        // 7. 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 将消息发到队列2中
                AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().
                        builder().
                        correlationId(properties.getCorrelationId()).
                        build();
                // 返回
                String message = new String(body);
                System.out.println("接收到消息: " + message);
                // 响应消息
                String response = "request: " + message + " 接收成功";
                channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());
                // 对消息进行应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);

需要注意的是,在这里我们需要手动对消息进行应答,而不是自动确认: 

在 RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否会自动向消息队列确认消息

当设置为 true 时,消息队列会在将消息发送给消费者后,认为消息已经被成功消费,立即删除该条消息,这也就意味着,若消费者处理消息失败,消息就会丢失

当设置为 false 时,消息队列在将消息发送给消费者后,需要消费者显示地调用 basicAck 方式来确认消息,手动确认提供了更高的可靠性,保证消息不会被意外丢失,适用于消息处理重要且需要确保每个消息被正确处理的场景

完整代码:

public class Service {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        // 4. 创建 Channel
        Channel channel = connection.createChannel();
        // 使用默认的交换机
        // 5. 声明队列
        channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);
        channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);
        // 6. 设置同时最多只能获取一条消息
        channel.basicQos(1);
        // 7. 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 将消息发到队列2中
                AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().
                        builder().
                        correlationId(properties.getCorrelationId()).
                        build();
                // 返回
                String message = new String(body);
                System.out.println("接收到消息: " + message);
                // 响应消息
                String response = "request: " + message + " 接收成功";
                channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());
                // 对消息进行应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);
    }
}

运行代码,观察结果:

Publisher Confirms(发布确认)

消息中间件,都会面临消息丢失的问题

消息丢失大概分为三种情况:

1. 生产者的问题:由于应用程序故障、网络抖动等各种原因,生产者没有成功向 broker 发送消息

2. 消息中间件的问题:生产者成功将消息发送给了 broker,但 broker 未能将消息保存好,导致消息丢失

3. 消费者的问题:broker 将消息发送给了消费者,消费者在消费消息时,未处理好,导致 broker 将消费失败的消息从队列中删除了

Rabbit 针对上述问题给出了相应的解决方案:

针对问题1,可以采用发布确认(Publisher Confirms)机制实现

针对问题2,可以通过持久化机制

针对问题3,可以采用消息应答机制

接下来,我们就来进一步学习 发布确认机制

发布确认的过程:

(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便追踪消息的状态

(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达

其中,deliveryTag 包含了确认消息的序号,此外,broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示这个序号之前的所有消息都已经被处理

发送确认机制最大的好处在于它是 异步 的,生产者可以同时发布消息和等待信道返回确认消息

当消息最终得到确认之后,生产者可以通过 回调方法 来处理该确认消息

若 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令

使用发布确认机制,需要将信道设置为 confirm(确认)模式:

            // 开启信道
            Channel channel = connection.createChannel();
            // 开启信道确认模式
            channel.confirmSelect();

 发布模式有 3 种确认策略,我们分别来进行学习

由于使用每种策略时都需要建立连接,因此,我们将建立连接抽取出来:

    public static Connection createConnection() throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        return connection;
    }

Publishing Messages Individually(单独确认)

单独确认模式下,每发送一条消息,RabbitMQ 会在消息被成功持久化到队列或者被消费者成功接收后,发回一个确认(acknowledgment)。生产者可以收到关于每条消息的确认信息

也就是说,生产者发送消息后会等待每条消息的确认信号。如果消息发送成功,RabbitMQ 会返回一个确认信号;如果消息失败,RabbitMQ 会返回一个负确认信号(nack)

我们先在 Constans 类中声明会使用的队列:

    // 发布确认模式
    public static final String PUBLISH_CONFIRMS_QUEUE_1 = "publish.confims.queue1";
    public static final String PUBLISH_CONFIRMS_QUEUE_2 = "publish.confims.queue2";
    public static final String PUBLISH_CONFIRMS_QUEUE_3 = "publish.confims.queue3";

我们仍使用默认的交换机进行路由 

每次都发送 200 条消息:

public class Producer {
    public static int MESSAGE_COUNT = 200;
}

 每当发送一条消息,就使用 channel.waitForConfirms() 方法等待确认消息

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

等待确认消息,当消息被确认,方法就会返回,若消息超时,就会抛出 TimeoutException 异常,若消息丢失,就会抛出 IOException

此外,我们记录 单独确认策略 发送消息的耗时:

public class Producer {
    public static int MESSAGE_COUNT = 200;
    public static int WAIT_TIME = 5000;

    public static void publishMessageIndividually() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 开启信道确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);
            // 记录开始时间
            long startTime = System.currentTimeMillis();
            // 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());
                // 等待确认
                channel.waitForConfirms(WAIT_TIME);
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

 完整代码:

public class Producer {
    public static int MESSAGE_COUNT = 200;
    public static int WAIT_TIME = 5000;

    // 建立连接
    public static Connection createConnection() throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        return connection;
    }

    // 单独确认模式
    public static void publishMessageIndividually() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 开启信道确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);
            // 记录开始时间
            long startTime = System.currentTimeMillis();
            // 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());
                // 等待确认
                channel.waitForConfirmsOrDie(WAIT_TIME);
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        // 单独确认模式
        publishMessageIndividually();
    }
}

运行结果:

 

可以看到,发送 200 条消息的耗时较长

且,单独确认策略是每发送一条消息后,就调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认,也就是一种串行同步等待的方式,尤其是对于持久化的消息而言,需要等待消息确认存储在磁盘之后才会返回

但发布确认机制支持异步确认,即,可以一边发送消息,一边等待消息确认

我们接着看另外两种策略

Publishing Messages in Batches(批量确认)

批量确认会在每发送一批消息后,调用 waitForConfirms 方法,等待服务器的确认返回

我们每发送 50 条消息,就调用 waitForConfirms  方法进行确认:

    public static int BATCH_SIZE = 50;
    // 批量确认模式
    public static void publishMessageInBatches() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 设置为 confirm 模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);
            // 发送消息
            int messageCount = 0;
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());
                messageCount++;
                // 批量确认
                if(messageCount == BATCH_SIZE) {
                    channel.waitForConfirms(WAIT_TIME);
                    messageCount = 0;
                }
            }
            // 消息发送完,若还有未确认消息,则进行最后的确认
            if (messageCount > 0) {
                channel.waitForConfirms(WAIT_TIME);
            }
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages in batch in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

需要注意的是,若我们发送的消息为 210 条,此时最后的十条消息未被确认,因此,我们在消息发送完成后,进行最后的确认

调用  publishMessageInBatches 方法,并观察结果:

我们可以看到,相比于单独确认策略,批量确认极大的提高了 confirm 的效率,但当出现了 Basic.Nack 或超时时,我们无法确定是哪一条消息出现了问题,客户端需要将这一批消息都进行重发,这也就重复发送了很多消息,当消息经常丢失时,批量确认的性能会不升反降

最后,我们来看 异步确认

Handling Publisher Confirms Asynchronously(异步确认)

异步确认提供了一个回调方法,服务端确认了一条或多条消息后,客户端会调用这个方法进行处理

Channel 接口提供了 addConfirmListener 方法,可以添加 ConfirmListener 回调接口

 

 ConfirmListener 接口中包含两个重要方法:

handleAck handleNack,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack

deliveryTag 表示发送消息的序号

multiple 表示是否批量确认,开启批量确认后,若 RabbitMQ 返回的消息序号为 20,则表明 20 条消息都已经接收成功;当不开启批量确认时,若 RabbitMQ 返回的消息序号为 20 ,则表明 20 条消息被成功接收

在使用异步确认策略时,我们需要为每个 Channel 维护一个已发送消息的序号集合,当收到 RabbitMQ 的 confirm 回调时,从集合中删除掉对应消息

Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号。我们可以使用 SortedSet 的有序性来维护这个已发送消息的集合

实现步骤:

1. 使用有序集合存储未确认的消息序号

            // 使用有序集合来存储未确认的消息
            SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

2. 当收到 ack 时,从集合中删除消息序号,若为批量确认,则删除小于等于当前消息序号的所有序号

            // 进行确认
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // 若为批量确认,则删除确认序号前所有元素
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                }
            });

3. 当接收到 nack 时,需要根据具体情况进行消息重发等操作

在这里,我们就不对其进行处理了,直接将消息清除:

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    // 处理失败,消息重发...
                    // 若为批量确认,则删除确认序号前所有元素
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                }

接着,我们发送消息,每当发送一条消息,就将其序号存储到有序集合中:

            // 发送消息
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                // 获取序号
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());
                // 存储序号
                confirmSet.add(nextPublishSeqNo);
            }

当有序集合为空时,消息确认完,因此,我们使用 while 循环等待消息确认完毕:

            // 消息确认完毕
            while (!confirmSet.isEmpty()) {
                Thread.sleep(10);
            }

若循环体中什么也不写,while 循环执行的速度会非常快,因此,每当判断一次,我们让其等待 10 ms

完整代码:

    // 异步确认模式
    public static void handlePublishConfirmAsynchronously() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 设置 confirm 模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);
            // 使用有序集合来存储未确认的消息
            SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
            // 进行确认
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // 若为批量确认,则删除确认序号前所有元素
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    // 处理失败,消息重发...
                    // 若为批量确认,则删除确认序号前所有元素
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                }
            });
            // 记录开始时间
            long startTime = System.currentTimeMillis();
            // 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                // 获取序号
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());
                // 存储序号
                confirmSet.add(nextPublishSeqNo);
            }
            // 消息确认完毕
            while (!confirmSet.isEmpty()) {
                Thread.sleep(10);
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages and handled confirms asynchronously in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

运行结果:

可以看到,三种策略中,异步确认的表现更好

完整代码:

public class Producer {
    public static int MESSAGE_COUNT = 200;
    public static int WAIT_TIME = 5000;
    public static int BATCH_SIZE = 50;

    // 建立连接
    public static Connection createConnection() throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost(Constants.HOST); // ip 的默认值为 localhost
        factory.setPort(Constants.PORT); // 默认值为 5672
        factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
        // 账号
        factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
        factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
        // 3. 创建连接 Connection
        Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
        return connection;
    }

    // 单独确认模式
    public static void publishMessageIndividually() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 开启信道确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);
            // 记录开始时间
            long startTime = System.currentTimeMillis();
            // 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());
                // 等待确认
                channel.waitForConfirmsOrDie(WAIT_TIME);
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    // 批量确认模式
    public static void publishMessageInBatches() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 设置为 confirm 模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);
            // 发送消息
            int messageCount = 0;
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());
                messageCount++;
                // 批量确认
                if(messageCount == BATCH_SIZE) {
                    channel.waitForConfirms(WAIT_TIME);
                    messageCount = 0;
                }
            }
            // 消息发送完,若还有未确认消息,则进行最后的确认
            if (messageCount > 0) {
                channel.waitForConfirms(WAIT_TIME);
            }
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages in batch in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    // 异步确认模式
    public static void handlePublishConfirmAsynchronously() {
        try (Connection connection = createConnection()){
            // 开启信道
            Channel channel = connection.createChannel();
            // 设置 confirm 模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);
            // 使用有序集合来存储未确认的消息
            SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
            // 进行确认
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // 若为批量确认,则删除确认序号前所有元素
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    // 处理失败,消息重发...
                    // 若为批量确认,则删除确认序号前所有元素
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                }
            });
            // 记录开始时间
            long startTime = System.currentTimeMillis();
            // 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "message " + i;
                // 获取序号
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());
                // 存储序号
                confirmSet.add(nextPublishSeqNo);
            }
            // 消息确认完毕
            while (!confirmSet.isEmpty()) {
                Thread.sleep(10);
            }
            // 记录结束时间
            long endTime = System.currentTimeMillis();
            System.out.printf("publish %d messages and handled confirms asynchronously in %d ms\n", MESSAGE_COUNT, endTime - startTime);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        // 单独确认模式
        publishMessageIndividually();
        // 批量确认模式
         publishMessageInBatches();
        // 异步确认
         handlePublishConfirmAsynchronously();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2276788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

备战蓝桥杯:树的存储与遍历(dfs和bfs)

树的概念 树的逻辑结构是树形结构&#xff0c;和我们之前的线性结构又不太一样了&#xff0c;是一种一对多的关系 树的结点分为根节点&#xff0c;叶子结点&#xff08;没有分支的结点&#xff09; 以及分支结点 从上往下看&#xff0c;每个结点都有0个或多个后继 从下往上…

超大规模分类(三):KNN softmax

传统的分类损失计算输入数据和每个类别中心的距离&#xff0c;来优化模型的训练。KNN softmax通过选择和输入数据最相关的top-K个类别&#xff0c;仅计算输入数据和top-K个类别中心的距离&#xff0c;以减小计算量。 KNN softmax首次诞生于达摩院机器智能技术实验室发表的SIGKD…

ubuntu官方软件包网站 字体设置

在https://ubuntu.pkgs.org/22.04/ubuntu-universe-amd64/xl2tpd_1.3.16-1_amd64.deb.html搜索找到需要的软件后&#xff0c;点击&#xff0c;下滑&#xff0c; 即可在Links和Download找到相关链接&#xff0c;下载即可&#xff0c; 但是找不到ros的安装包&#xff0c; 字体设…

项目实战——使用python脚本完成指定OTA或者其他功能的自动化断电上电测试

前言 在嵌入式设备的OTA场景测试和其他断电上电测试过程中&#xff0c;有的场景发生在夜晚或者随时可能发生&#xff0c;这个时候不可能24h人工盯着&#xff0c;需要自动化抓取串口日志处罚断电上电操作。 下面的python脚本可以实现自动抓取串口指定关键词&#xff0c;然后触发…

电脑分辨率调到为多少最佳?电脑分辨率最佳设置

电脑分辨率是指电脑屏幕上显示的像素点的数量&#xff0c;通常用水平和垂直方向的像素点数来表示&#xff0c;例如19201080。像素点越多&#xff0c;显示的内容就越清晰&#xff0c;但也会占用更多的系统资源和电力。那么多电脑分辨率多少最佳&#xff1f;以及电脑分辨率如何调…

代码随想录算法【Day20】

Day20 二叉搜索树 235. 二叉搜索树的最近公共祖先 理解只要当前节点的值在p和q节点的值的中间&#xff0c;那这个值就是最近的公共祖先&#xff0c;绝对不是次近的&#xff0c;这个题就好做了。 递归法 二叉搜索树本身是有序的&#xff0c;所以不涉及到前中后序的遍历 cl…

【SpringBoot】@Value 没有注入预期的值

问题复现 在装配对象成员属性时&#xff0c;我们常常会使用 Autowired 来装配。但是&#xff0c;有时候我们也使用 Value 进行装配。不过这两种注解使用风格不同&#xff0c;使用 Autowired 一般都不会设置属性值&#xff0c;而 Value 必须指定一个字符串值&#xff0c;因为其…

车联网安全 -- 数字证书到底证明了什么?

在车联网安全--TLS握手过程详解里面&#xff0c;我们了解到握手时&#xff0c;Server会向Client发送Server Certificate&#xff0c;用于证明自己的身份合法&#xff0c;为什么会有这一步呢&#xff1f; 我们回顾一下数字签名的过程&#xff1a; Bob将使用自己的公钥对“Hello…

Elasticsarch:使用全文搜索在 ES|QL 中进行过滤 - 8.17

8.17 在 ES|QL 中引入了 match 和 qstr 函数&#xff0c;可用于执行全文过滤。本文介绍了它们的作用、使用方法、与现有文本过滤方法的区别、当前的限制以及未来的改进。 ES|QL 现在包含全文函数&#xff0c;可用于使用文本查询过滤数据。我们将回顾可用的文本过滤方法&#xf…

【HTML+CSS+JS+VUE】web前端教程-31-css3新特性

圆角 div{width: 100px;height: 100px;background-color: saddlebrown;border-radius: 5px;}阴影 div{width: 200px;height: 100px;background-color: saddlebrown;margin: 0 auto;box-shadow: 10px 10px 20px rgba(0, 0, 0, 0.5);}

Spring Boot 项目自定义加解密实现配置文件的加密

在Spring Boot项目中&#xff0c; 可以结合Jasypt 快速实现对配置文件中的部分属性进行加密。 完整的介绍参照&#xff1a; Spring Boot Jasypt 实现application.yml 属性加密的快速示例 但是作为一个技术强迫症&#xff0c;总是想着从底层开始实现属性的加解密&#xff0c;…

若依前后端分离项目部署(使用docker)

文章目录 一、搭建后端1.1 搭建流程&#xff1a;1.2 后端零件:1.2.1 mysql容器创建&#xff1a;1.2.2 redis容器创建&#xff1a;1.2.3 Dockerfile内容&#xff1a;1.2.4 构建项目镜像&#xff1a;1.2.5 创建后端容器&#xff1a; 二、前端搭建&#xff1a;2.1 搭建流程&#x…

Vue2+OpenLayers使用Overlay实现点击获取当前经纬度信息(提供Gitee源码)

目录 一、案例截图 二、安装OpenLayers库 三、代码实现 关键参数&#xff1a; 实现思路&#xff1a; 核心代码&#xff1a; 完整代码&#xff1a; 四、Gitee源码 一、案例截图 二、安装OpenLayers库 npm install ol 三、代码实现 覆盖物&#xff08;Overlay&#xf…

Oracle 终止正在执行的SQL

目录 一. 背景二. 操作简介三. 投入数据四. 效果展示 一. 背景 项目中要求进行性能测试&#xff0c;需要向指定的表中投入几百万条数据。 在数据投入的过程中发现投入的数据不对&#xff0c;需要紧急停止SQL的执行。 二. 操作简介 &#x1f449;需要DBA权限&#x1f448; ⏹…

Oopsie【hack the box】

Oopsie 解题流程 文件上传 首先开启机器后&#xff0c;我们先使用 nmap -sC -SV来扫描一下IP地址&#xff1a; -sC&#xff1a;使用 Nmap 的默认脚本扫描&#xff08;通常是 NSE 脚本&#xff0c;Nmap Scripting Engine&#xff09;。这个选项会自动执行一系列常见的脚本&am…

V少JS基础班之第四弹

一、 前言 第四弹内容是操作符。 本章结束。第一个月的内容就完成了&#xff0c; 是一个节点。 下个月我们就要开始函数的学习了。 我们学习完函数之后。很多概念就可以跟大家补充说明了。 OK&#xff0c;那我们就开始本周的操作符学习 本系列为一周一更&#xff0c;计划历时6…

【STM32-学习笔记-7-】USART串口通信

文章目录 USART串口通信Ⅰ、硬件电路Ⅱ、常见的电平标准Ⅲ、串口参数及时序Ⅳ、STM32的USART简介数据帧起始位侦测数据采样波特率发生器 Ⅴ、USART函数介绍Ⅵ、USART_InitTypeDef结构体参数1、USART_BaudRate2、USART_WordLength3、USART_StopBits4、USART_Parity5、USART_Mode…

Docker 安装开源的IT资产管理系统Snipe-IT

一、安装 1、创建docker-compose.yaml version: 3services:snipeit:container_name: snipeitimage: snipe/snipe-it:v6.1.2restart: alwaysports:- "8000:80"volumes:- ./logs:/var/www/html/storage/logsdepends_on:- mysqlenv_file:- .env.dockernetworks:- snip…

达梦8-DMSQL程序设计学习笔记1-DMSQL程序简介

1、DMSQL程序简介 DMSQL程序是达梦数据库对标准SQL语言的扩展&#xff0c;是一种过程化SQL语言。在DMSQL程序中&#xff0c;包括一整套数据类型、条件结构、循环结构和异常处理结构等&#xff0c;DMSQL程序中可以执行SQL语句&#xff0c;SQL语句中也可以使用DMSQL函数。 DMSQ…

NLP中常见的分词算法(BPE、WordPiece、Unigram、SentencePiece)

文章目录 一、基本概念二、传统分词方法2.1 古典分词方法2.2 拆分为单个字符 三、基于子词的分词方法&#xff08;Subword Tokenization&#xff09;3.1 主要思想3.2 主流的 Subword 算法3.3 Subword 与 传统分词方法的比较 四、Byte Pair Encoding (BPE)4.1 主要思想4.2 算法过…