目录
1. 7种工作模式
1.1 Simple(简单模式)
1.2 Work Queue(工作队列)
1.3 Publish/Subscribe(发布/订阅)
1.4 Routing(路由模式)
1.5 Topics(通配符模式)
1.6 RPC(RPC 通信)
1.7 Publisher Confirms(发布确认)
2. 工作模式的使用案例
2.1 简单模式
2.2 Work Queue(工作队列)
2.3 Publish/Subscribe(发布 / 订阅)
2.4 Routing(路由模式)
2.5 Topics(通配符模式)
2.6 RPC(RPC 模式)
2.7 Publisher Confirms(发布确认)
1. 7种工作模式
1.1 Simple(简单模式)
P:生产者,发送消息的程序
C:消费者,消息的接收者
Queue:消息队列,可以缓存消息,生产者给里面发送消息,消费者从里面取出消息
一个生产者,一个消费者,也称为点对点(Point-to-Point)模式
适用场景:消息只能被单个消费者处理
1.2 Work Queue(工作队列)
一个生产者,多个消费者,在多个消息的情况下,Work Queue 会将消息分发给不同的消费者,每个消费者都会收到不同的消息
例如队列中有 10 条消息,两个消费者共同消费者 10 条消息,消息不会重复消费
使用场景:集群环境中做异步处理
比如 12306 短信通知服务,订票成功后,订单消息会发送到 RabbitMQ,短信服务从 RabbitMQ 中获取订单信息,并发送通知短信
1.3 Publish/Subscribe(发布/订阅)
Exchange:交换机(X)
作用:生产者将消息发送到 X,由交换机将消息按一定规则路由到一个或者多个队列中
RabbitMQ 交换机由 4 中类型,fanout、direct、topic、headers,不同类型有着不同的路由策略
Fanout:广播,把消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)
Direct:定向,把消息交给符合指定 routing key 的队列(Routing 模式)
Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics 模式)
headers 类型的交换机不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 树形来进行匹配,headers 类型的交换机性能很差,基本不实用
其中 Exchange 只负责发消息,不具备存储消息的能力,如果没有任何队列与 Exchange 绑定或者没有符合路由规则的队列,那么消息就会丢失
RoutingKey:路由键,生产者将消息发送给交换机时,指定一个字符串,用来告诉交换机应该如何处理这个消息
BindingKey:绑定,RabbitMQ 中通过 Binding(绑定),将交换机与队列关联起来,在绑定时指定一个 BindingKey,这样 RabbitMQ 就知道如何正确地将消息路由到队列了
例如当 RoutingKey 为 Bining Key1 时,消息就会路由到第一个队列,为 Binding Key2 时,消息路由到第二个队列
Publish/Subscribe 模式中,一个生产者 P,多个消费者 C1,C2,X 代表交换机消息复制多分,每个消费者接收相同的消息
适合场景:消息需要被多个消费者同时接收的场景,如:实时通知或者广播消息
1.4 Routing(路由模式)
路由模式是发布订阅模式的变化,在发布订阅的基础上,增加路由 key,发布模式是将所有消息分发给所有的消费者,路由模式是 Exchange 根据 RoutingKey 的规则,将数据删选后发给对应的消费者队列
适合场景:需要根据特定规则分发消息的场景
例如,打印日志,日志的等级为 error、warning、info、debug,就可以通过这种模式,把不同的日志发送到不同的队列
1.5 Topics(通配符模式)
路由模式的升级,在 routingKey 的基础上,增加了通配符的功能,Topics 和 Routing 的基本原理相同,生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列
Routing 模式是相等匹配,而 Topics 模式是通配符匹配
适合场景:需要灵活配置和过滤消息的场景
1.6 RPC(RPC 通信)
在 RPC 通信过程中,没有生产者和消费者,通过两个队列实现了一个可回调的过程
1)客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,用于接收服务器的响应
2)服务器接收到请求后,处理请求并发送消息到 replyTo 指定的回调队列
3)客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息 correlationId 属性,以确保它是所期望的响应
1.7 Publisher Confirms(发布确认)
Publisher Confirms 模式是 RabbitMQ 提供的一种消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理
1)生产者将 Channel 设置为 confirm 模式后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,一遍跟踪消息的状态
2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个(ACK)给生产者,表达消息已经发送
适用场景:对数据安全性要求较高的场景
2. 工作模式的使用案例
2.1 简单模式
简单模式在上一篇写了,这里省略
2.2 Work Queue(工作队列)
工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收
步骤:
1)引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
2)编写生产者代码
定义常量类,把端口,账号,密码都设置好
public class Constant {
public static final String HOST = "44.34.51.65";
public static final int PORT = 5672;
public static final String USER_NAME = "lk";
public static final String PASS_WORD = "lk";
public static final String VIRTUAL_HOST = "study";
//工作队列模式
public static final String WORK_QUEUE = "work.queue";
}
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constant.USER_NAME); //账号
connectionFactory.setPassword(Constant.PASS_WORD); //密码
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列 使用内置交换机
//如果队列不存在,则创建,如果队列存在,则不创建
channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null);
//4. 发送消息
for (int i = 0; i < 10; i++) {
String msg = "hello work queue" + i;
channel.basicPublish("",Constant.WORK_QUEUE,null,msg.getBytes());
}
System.out.println("消息发送成功");
//6. 释放资源
channel.close();
connection.close();
}
}
3)编写消费者代码
public class Comsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.WORK_QUEUE,true,consumer);
}
}
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.WORK_QUEUE,true,consumer);
}
}
运行程序,观察结果
由于消息较少,处理较快,如果先启动生产者,在启动消费者,第一个消费者就会瞬间把 10 条消息消费掉,因此先启动消费者
Consumer1:
Consumer2:
可以看到有两个消费者,消费的消息都是不同的
2.3 Publish/Subscribe(发布 / 订阅)
在发布 / 订阅模型中,多了一个 Exchange 角色
1)编写生产者代码
需要创建交换机,并且绑定队列和交换机
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constant.USER_NAME); //账号
connectionFactory.setPassword(Constant.PASS_WORD); //密码
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
/**
* exchange:交换机
* type:交换机类型
* durable:持久化
*/
channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
//4. 声明队列
channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
/**
* queue:队列
* exchange:交换机
* routingKey:
*/
channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,"");
channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,"");
//6. 发布消息
String msg = "hello fanout";
channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,msg.getBytes());
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
}
2)编写消费者代码
public class Comsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer);
}
}
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.FANOUT_QUEUE2,true,consumer);
}
}
运行程序,观察结果
生产者
消费者1
消费者2
两个队列分别有了一条消息
Exchange 多了队列的绑定关系
2.4 Routing(路由模式)
和发布 / 订阅模式不同的是,队列和交换机的绑定,不能是任意绑定了,而是要指定一个 BindingKey(RoutingKey 的一种),消息的发送方向 Exchange 发送消息时,也需要指定消息的 RoutingKey,交换机需要根据消息的 RoutingKey 进行判断,只有队列绑定时的 Binding 和发送消息的 RoutingKey 完全一致,才会接收到消息
1)编写生产者代码
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constant.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);
//4. 声明队列
channel.queueDeclare(Constant.DIRECT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.DIRECT_QUEUE2,true,false,false,null);
//5. 绑定交换机和队列
channel.queueBind(Constant.DIRECT_QUEUE1,Constant.DIRECT_EXCHANGE,"a");
channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"a");
channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"b");
channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"c");
//6. 发送消息
String msg = "hello direct, my routingKey is a";
channel.basicPublish(Constant.DIRECT_EXCHANGE,"a",null,msg.getBytes());
msg = "hello direct, my routingKey is b";
channel.basicPublish(Constant.DIRECT_EXCHANGE,"b",null,msg.getBytes());
msg = "hello direct, my routingKey is c";
channel.basicPublish(Constant.DIRECT_EXCHANGE,"c",null,msg.getBytes());
System.out.println("发送消息成功");
//7. 释放资源
channel.close();
connection.close();
}
}
2)编写消费者代码
public class Comsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.DIRECT_QUEUE1,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.DIRECT_QUEUE1,true,consumer);
}
}
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.DIRECT_QUEUE2,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.DIRECT_QUEUE2,true,consumer);
}
}
运行程序,观察结果
生产者
消费者1
消费者2
可以看到队列 1 里面有一条消息,队列 2 里面有 3 条消息,符合路由模式
exchange 下队列和 RoutingKey 的绑定关系
2.5 Topics(通配符模式)
Topics 模式使用的交换机类型为 topic,Topics 类型在匹配规则上进行了扩展,BindingKey 支持通配符匹配
Topics 类型的交换机在匹配上的规则:
1)RoutingKey 是一系列由(.)分割的单词,例如"stock.usd.nyse"
2)BindingKey 和 RoutingKey 一样,也是(.)分割的字符串
3)BindingKey 中可以存在两种特殊的字符串,用于模糊匹配
*表示一个单词,#表示多个单词
1)编写生产者代码
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
//4. 声明队列
channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
//5. 绑定交换机和队列
channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*");
channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b");
channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#");
//6. 发送消息
String msg = "hello topic, my routingKey is ae.a.f";
channel.basicPublish(Constant.TOPIC_EXCHANGE,"ae.a.f",null,msg.getBytes()); //发送到队列1
msg = "hello direct, my routingKey is ef.a.b";
channel.basicPublish(Constant.TOPIC_EXCHANGE,"ef.a.b",null,msg.getBytes());//发送到队列1和队列2
msg = "hello direct, my routingKey is c.ef.d";
channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.ef.d",null,msg.getBytes()); //发送到队列2
System.out.println("发送消息成功");
//7. 释放资源
channel.close();
connection.close();
}
}
2)编写消费者代码
public class Comsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer);
}
}
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
//4. 消费队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:" + new String(body));
}
};
channel.basicConsume(Constant.TOPIC_QUEUE2,true,consumer);
}
}
运行程序,观察结果
可以看到队列的消息数
消费者1
消费者2
2.6 RPC(RPC 模式)
通过两个队列实现一个可回调的过程
1)编写客户端代码
客户端代码流程:
声明两个队列,包含回调队列 replyQueueName ,声明本次请求的唯一标志 corrId
将 replyQueueName 和 corrId 配置到要发送的消息队列中
使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
阻塞队列由消息后,主线程被唤醒,打印返回内容
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
/**
* RPC 客户端
* 1. 发送请求
* 2. 接收响应
*/
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Constant.RPC_REQUEST_QUEUE,true,false,false,null);
channel.queueDeclare(Constant.RPC_RESPONSE_QUEUE,true,false,false,null);
//3. 发送请求
String msg = "hello rpc";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constant.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("",Constant.RPC_REQUEST_QUEUE,properties,msg.getBytes());
//4. 接收消息
//使用阻塞队列来存储响应信息
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String resMsg = new String(body);
System.out.println("接收到回调消息:" + resMsg);
if (correlationID.equals(properties.getCorrelationId())) {
//说明 correlationID 校验一致
response.offer(resMsg);
}
}
};
channel.basicConsume(Constant.RPC_RESPONSE_QUEUE,true,consumer);
String result = response.take();
System.out.println("RPC Client响应结果:" + result);
}
}
2)编写服务端代码
/**
* RPC server
* 1. 接收请求
* 2. 发送响应
*/
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 接收请求
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body);
System.out.println("接收到请求:" + request);
String response = "针对 request:" + request + "响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("",Constant.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(Constant.RPC_REQUEST_QUEUE,false,consumer);
}
}
先运行客户端
可以看到客户端这边产生了阻塞,客户端是 request 的生产者,是 response 的消费者
运行服务端
可以看到服务器端接收到了请求之后,发送响应,客户端这边正确收到了响应
2.7 Publisher Confirms(发布确认)
作为消息中间件,都会面临消息丢失的问题,其中消息丢失分为三种情况:
1)生产者问题,因为程序故障,网络抖动等各种原因,生产者没有成功向 broker 发送消息
2)消息中间件自身问题,生产者成功发送给了 Broker,但是 Broker 没有把消息保存好导致丢失
3)消费者问题,Broker 发送消息到消费者,因为消费者没有处理好,导致 Broker 将消息从队列中删除了
情况1,可以采用发布确认机制实现
生产者将信道设置为 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID),使生产者直到消息已经正确到达目的队列了
其中 deliveryTag 包含了确认消息的序号,此处 broker 也可以设置 channel.basicAck 方法中的 multiple参数,标识这个序号之前的所有消息都已经得到了处理
发送方确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息
1)当消息最终确认之后,生产者可以通过回调方法来处理该确认消息
2)如果 RabbitMQ 因为自身错误导致消息丢失们就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令
发布确认有 3 中策略 Publishing Messages Individually(单独确认)、Publishing Messages in Batches(批量确认)、Handling Publisher Confirms Asynchronously(异步确认),此处将 3 种代码写在一起
public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirm.queue1";
public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirm.queue2";
public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirm.queue3";
public class PublisherConfirm {
private static final Integer MESSAGE_COUNT = 200;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASS_WORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
//单独确认
publishingMessagesIndividually();
//批量确认
publishingMessagesInBatches();
//异步确认
handlingPublisherConfirmsAsynchronously();
}
/**
* 异步确认
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
try (Connection connection = createConnection()) {
//1. 开启信道
Channel channel = connection.createChannel();
//2. 设置信道为 confirm 模式
channel.confirmSelect();
//3. 声明队列
channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
//4. 监听 confirm
//集合中存储的是未确认的消息 ID
long start = System.currentTimeMillis();
//有序集合,元素按照自然顺序进行排序,存储未 confirm 消息序号
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// multiple 批量
// confirmSeqNo.headSet(n),方法返回当前集合中小于 n 的集合
if (multiple) {
//批量确认:将集合中小于等于当前序号 deliveryTag 元素的集合删除,标识这批序号的消息被 ack 了
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
//单挑确认:将当前的 deliveryTag 从集合中移除
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
confirmSeqNo.headSet(deliveryTag);
}
//业务需要根据实际场景来处理,例如重发,整理省略
}
});
//5. 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
//拿到消息的 ID
long seqNo = channel.getNextPublishSeqNo();
//发送消息时,会带着序号
channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());
//将消息的序号添加到有序集合,表示这个消息发送过去了但还未确认
confirmSeqNo.add(seqNo);
}
while (!confirmSeqNo.isEmpty()) {
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认策略,消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end - start);
}
}
/**
* 批量确认
*/
private static void publishingMessagesInBatches() throws Exception {
try (Connection connection = createConnection()) {
//1. 开启信道
Channel channel = connection.createChannel();
//2. 设置信道为 confirm 模式
channel.confirmSelect();
//3. 声明队列
channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);
//4. 发送消息
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
//当 outstandingMessageCount 和 batchSize 相等,就等待 5000 ms 之后在批量确认
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("批量确认策略,消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end - start);
}
}
/**
* 单独确认
*/
private static void publishingMessagesIndividually() throws Exception {
try (Connection connection = createConnection()) {
//1. 开启信道
Channel channel = connection.createChannel();
//2. 设置信道为 confirm 模式
channel.confirmSelect();
//3. 声明队列
channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
//4. 发送消息,并等待确认
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());
//等待确认消息,只要消息被确认,这个方法就会被返回,如果超时过期,则抛出 TimeoutException,如果消息被
//nack(丢失),waitForConfirmsOrDie将抛出IOException
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("单独确认策略,消息条数: %d,耗时: %d ms \n",MESSAGE_COUNT,end - start);
}
}
}
运行程序,观察结果
从上述程序可以看出,单独确认效率最低,而异步确认消息最高
单独确认:
这种策略时每发送一条消息后就调用 channel.waitForConfirmsOrDie 方法,之后等待服务端的确认,实际上时一种串行同步等待的方式,对于持久化的消息来说,需要等待消息确认存储在硬盘之后才会返回
批量确认:
每发送一条消息后,调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认返回,相比单独确认,数据量越大,效率越高,缺点是出现 Basic.Nack 或者超时时,不清楚是那条消息出现了问题,客户端需要将者一批消息全部重发,当消息经常丢失时,批量确认的性能不升反降
异步确认:
异步 confirm 方法实现最为复杂,Channel 接口提供了一个方法 addConfirmListener 这个方法,可以添加 ConfirmListener 回调接口
ConfirmListener 接口中包含两个方法:handleAck(long deliveryTag,boolean multiple)和 handleNack(long deliveryTag,boolean multiple),分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack
deliveryTag 表示发送消息的序号,multiple 表示是否批量确认