🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
目录
- 1. 7种工作模式介绍
- 1.1 简单模式(simple)
- 1.2 工作队列(Work Queue)
- 1.3 对交换机和路由键的解释
- 1.4 广播模式/发布订阅模式(Publish/Subscribe)
- 1.5 路由模式(Routing)
- 1.6 通配符模式(Topics)
- 1.7 RPC通信(RPC)
- 1.8 发布确认(Publish Confirms)
- 2. 工作模式的代码实现
- 2.1 简单模式
- 2.2 工作队列
- 2.3 广播模式/发布订阅模式(Publish/Subscribe)
- 2.4 路由模式(Routing)
- 2.5 通配符模式
- 2.6 RPC通信(RPC)
- 2.7 发布确认(Publisher Confirms/消息可靠性保证)
- 2.7.1 概述
- 2.7.2 单独确认
- 2.7.3 批量确认
- 2.7.4 异步确认
1. 7种工作模式介绍
1.1 简单模式(simple)
P: producer生产者,也就是要发送消息的程序.
C: consumer消费者,消息的接收者.
Queue: 消息队列,其中可以缓存信息,生产者可以向其中投递信息,消费者从其中获取信息.
特点: 一个生产者,一个消费者,一个消息只能被消费一次,也称为点对点模式.
1.2 工作队列(Work Queue)
一个生产者P,多个消费者C1,C2,在队列中有多个消息的情况下,work Queue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.
特点: 消息不会重复,分配给不同的消费者.
1.3 对交换机和路由键的解释
- X: 代表的是交换机(exchange),作用就是生产者在将消息发送到交换机之后,交换机会按照生产者指定的RoutingKey,也就是路由规则把消息路由到一个或者多个队列中.
- RabbitMQ中的交换机有四种类型: fanout,direct,topic,headers,不同的类型有不同的路由策略.
- fanout: 广播模式,将消息发送给所有绑定交换机的队列.(Publish/Subscribe模式)
- direct: 定向模式,将消息交给符合指定的routingKey的队列.(Routing模式)
- topic: 通配符模式,将消息交给符合指定的Routing pattern的队列.(Topics模式).
- header: headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在.
交换机只是负责转发消息,不具有存储消息的功能,因此,如果没有任何队列和交换机绑定,或者是消息没有任何符合路由规则的队列,那么信息就会丢失.
- RoutingKey: 路由键,生产者在消息转发给交换机的时候,指定的一个字符串,用来让交换机知道该如何处理这条消息.
- BindingKey: 绑定键,BindingKey是把交换机和消息队列绑定的字符串,这样交换机就会知道如何根据路由键将对应的消息转发到指定的队列了.
下面我们就举一个具体的例子来对RoutingKey和BindingKey具体说明.
在发送消息的时候,生产者设置了RoutingKey为orange,消息就会路由到Q1.
BindingKey其实也属于RoutingKey的一种,我们为了把这两个键混淆,我们可以这样理解:
在交换机和队列绑定的时候,需要的路由键是BindingKey.在发送消息的时候,需要的路由键是RoutingKey.
1.4 广播模式/发布订阅模式(Publish/Subscribe)
一个生产者P,多个消费者C1,C2,X代表交换机,交换机会将消息复制多份,发送给所有和交换机绑定的队列,每个消费者接收相同的消息.生产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者.
适用场景: 消息需要被多个消费者同时接收的场景.比如: 实时通知或者广播消息.
比如国家地震局发送地震预警,地震发生的时候,需要把预警消息发送给可能有震感地区的所有电子设备.
1.5 路由模式(Routing)
路由模式是发布订阅模式的变种,在发布订阅模式的基础上,增加了路由key.
相比广播模式,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列.
适合场景: 需要根据特定的规则分发消息的场景.
比如我们在Spring中学习的日志,日志等级分为error,warning,info,debug,就可以通过这种模式,把不同的日志发送到不同的队列.
1.6 通配符模式(Topics)
路由模式的升级版,在routingKey的基础上,增加了通配符的功能,使之更加灵活.其中,一个.
是一个节,使用*
代表的是一个节,使用#
代表的是多个节.Topics和Routing的基本原理相同.不同之处是:routingKey的匹配方式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适用场景: 需要灵活匹配和过滤消息的场景.
1.7 RPC通信(RPC)
在RPC模式中没有生产者和消费者,大概就是通过两个队列实现了一个消息回调的过程.有点类似与我们在网络中学习的"请求和响应",这个功能是MQ的额外功能.
- 客户端发送消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了回调队列,用于接收服务端返回的响应.
- 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列.
- ⼀旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应.因为队列中不仅仅有一条消息,保证发送的请求和收到的响应通过correlationId对应的上.
1.8 发布确认(Publish Confirms)
Publish Confirms模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制.在这种模式之下,生产者可以等待RabbitMQ服务器确认,可以确保消息已经被服务器接收并处理.
- 生产者将Channel设置为Confirm模式,发布的每一条消息都会获得一个唯一的ID,生产者可以将这些序号与消息关联起来,以便跟踪消息的状态.
- 当消息被RabbitMQ服务器接收并处理之后,服务器会一步地向生产者发送一个(ACK)给生产者(其中包含消息的唯一ID),表名消息已经送达.
通过该模式,生产者可以确保消息被RabbitMQ服务器成功接收,从而避免消息丢失的问题.
适用场景: 对数据安全性较高的场景,比如金融交易,订单处理.
2. 工作模式的代码实现
前面我们对这几种工作模式有了简单的了解,接下来我们学习他们的写法.
2.1 简单模式
就是快速上手中的程序,此处忽略.
2.2 工作队列
就是简单模式的增强版,和简单模式下最大的区别就是,工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收.
- 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
- 编写生产者
工作队列模式的代码和简单模式的代码没有多大的出入.我们只是发送了10次消息.我们把发送消息的地方改为10次发送.
有一些配置相关的东西是固定的,所以我们可以把他们单独提出来一个Constant类.
public class Constant {
public static String HOST = "39.105.137.64";
public static int PORT = 5672;
public static String USER_NAME = "jiangruijia";
public static String PASSWORD = "qwe123524";
public static String QUEUE_NAME = "work";
public static String VIRTUAL_HOST = "/";
}
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);
for (int i = 0; i < 10 ; i++){//发送10次消息
String message = "hello work~" + i;
channel.basicPublish("",Constant.QUEUE_NAME,null,message.getBytes());
}
channel.close();
connection.close();
}
}
- 消费者代码
public class Consumer2 {//两个消费者是竞争关系
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到消息" + message);
}
};
channel.basicConsume(Constant.QUEUE_NAME,true,consumer);
//先不要关闭资源,因为需要先开启消费者,等待生产者发送消息
}
}
另一个消费者和这个消费者相同,直接复制粘贴一份.
- 运行程序,观察结果
先启动两个消费者(而且两个消费者不可以关闭资源),再运行生产者,如果先启动生产者,在启动第一个消费者的时候,消息会被瞬间消费完.
2.3 广播模式/发布订阅模式(Publish/Subscribe)
- 编写生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机.
channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);
//声明队列
channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
//绑定队列与交换机
channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,"");
channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,"");
String message = "发送广播消息";
channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,message.getBytes());
channel.close();
connection.close();
}
}
参数解释:
- exchangeDeclare: 第一个参数是交换机名称,第二个参数是交换机的路由规则(这里指定为FANOUT广播类型),第三个参数是是否持久化,如果设置持久化,那么在重启服务之后,交换机不会被释放,第四个参数是是否自动删除,当没有队列与其绑定的时候,它就会被删除.第五个参数是是否是内部使用的,一般情况下为false,第六个参数是指定相关参数.
- queueDeclare: 声明队列,我们在之前解释过,这里不再赘述.
- queueBind: 绑定队列与交换机,第一个参数是队列名称,第二个参数是交换机名称,第三个参数是交换机和队列之间的路由规则,在这里我们是广播模式,所以我们没有指定路由规则,指定为默认的""
.
- basicPublish: 第一个参数是发送消息的交换机,第二个参数是发送消息时的路由关键字,这里为广播模式,所以我们指定为""
,第三个参数是一些相关配置,第四个参数是发送的消息.
- 编写消费者代码
虽然在消费者中可以不用声明队列,但是为有时候生产者和消费者不会在一个主机上,我们还是加上队列的声明比较符合逻辑,队列存在的时候,不会重复创建队列.
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//队列声明可以省略,如果队列已经存在,则不会创建队列
channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("获取到广播消息:"+message);
}
};
channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer);
}
}
第二个消费者直接复制一份,改掉消费者于队列的绑定和队列的声明即可.
- 运行程序
首先启动两个消费者,再运行生产者.
我们看到生产者生产出两条消息之后,迅速被消费者消费掉.
两个消费者接收到了消息:
2.4 路由模式(Routing)
相比于发布订阅模式,交换机和队列不可以是任意绑定了==,而是需要指定一个BindingKey(RoutingKey的一种)==.生产者在向交换机发送消息的时候,也需要指定RoutingKey.这时,Exchange也不再把消息交给每⼀个绑定的key,而是根据消息RoutingKey进行判断,只有队列绑定时的BindingKey和发送消息的RoutingKey完全⼀致,才会接收到消息.
- 编写生产者
与发布订阅模式的不同是,交换机类型不同,而且绑定队列的bindingKey不同.
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Constant.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.ROUTING_QUEUE2,true,false,false,null);
channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");
channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"a");
channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"b");
channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"c");
String message1 = "routingKey_a";
String message2 = "routingKey_b";
String message3 = "routingKey_c";
channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());
channel.basicPublish(Constant.ROUTING_EXCHANGE,"b",null,message2.getBytes());
channel.basicPublish(Constant.ROUTING_EXCHANGE,"c",null,message3.getBytes());
connection.close();
channel.close();
}
}
和上面广播模式不同的是,在绑定队列与交换机的时候,需要指定bindingKey.channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");
比如这一行指定了bindingKey为a
.
在发送消息的时候,需要指定消息的RoutingKey,比如:channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());
.
- 编写消费者
Routing模式的消费者代码和Publish/Subscribe代码⼀样,同样复制出来两份.修改消费的队列名称就可以.
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到消息:"+message);
}
};
channel.basicConsume(Constant.ROUTING_QUEUE1,true,consumer);
}
}
- 运行代码
队列中的消息被消费.
队列2中的消息:
队列1中的消息:
2.5 通配符模式
Topic和Routing模式的区别就是:
1. Topic模式使用的交换机类型为topic(Routing模式使⽤的交换机类型为direct).
2. Topic类型的交换机在匹配规则上进行了扩展,bindingKey支持通配符匹配.
在Topic类型的交换机在匹配的规则上,有一些要求:
1. RoutingKey是一系列由.
分割的单词,比如"a.b.c"
.
2. BindingKey和RoutingKey一样,也是点.
分割的字符串.
3. BindingKey中可以存在两种特殊字符串,用于模糊匹配.其中*
表示一个单词,#
表示多个单词.
比如:
• BindingKey为"d.a.b"会同时路由到Q1和Q2.
• BindingKey为"d.a.f"会路由到Q1.
• BindingKey为"c.e.f"会路由到Q2.
• BindingKey为"d.b.f"会被丢弃,或者返回给⽣产者(需要设置mandatory参数).
接下来我们就来实现Topic模式:
- 编写生产者
和路由模式最大的区别就是: 交换机类型不同,绑定队列的RoutingKey不同.
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);//交换机类型不同
channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*");
channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b");
channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#");//bindingKey不同
String message1 = "hello_b.a.c";
String message2 = "hello_a.c.b";
String message3 = "hello_c.a.b";
channel.basicPublish(Constant.TOPIC_EXCHANGE,"b.a.c",null,message1.getBytes());
channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());
channel.basicPublish(Constant.TOPIC_EXCHANGE,"a.c.b",null,message2.getBytes());
channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());
channel.close();
connection.close();
}
}
- 编写消费者
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("收到消息: "+s);
}
};
channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer);
}
}
第二个消费者直接修改一下队列名称就可以.
- 运行结果
队列产生消息,并消费
消费者根据RoutingKey收到了关键字:
消费者2:
消费者1:
2.6 RPC通信(RPC)
RPC通信,是远程过程调用,它是一种发送请求,得到响应的模式,有点类似与我们之前学习网络时候的http协议.
RabbitMQ实现RPC通信的过程,大概是通过两个队列实现⼀个可回调的过程.
在这个模式中,没有明确的生产者和消费者,在发送请求的时候,客户端是生产者,服务端是消费者,在接收响应的时候,服务端是生产者,客户端是消费者.
大概的流程如下:
- 客户端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队列,在消息中还会设置correlate_id,这个字段用于保证接收到的响应是与请求对应的响应,服务端处理后,会把响应结果发送到这个队列.
- 服务端接收到消息之后,处理请求并发送响应消息到replyTo指定的回调队列.
- 客户端在回调队列上等待响应消息.⼀旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应.
接下来我们来实现PCR模式:
- 客户端代码
客户端代码的主要流程如下:
首先声明两个队列,包含回调队列replyQueueName,和发送请求的队列.并声明本次请求的唯一标志corrld.之后将replyQueueName和corrld配置到要发送的消息队列中.之后使用阻塞队列来阻塞当前的进程,监听回调队列中的消息,把请求放到阻塞队列中.阻塞队列中有消息之后,主线程被唤醒,打印返回的内容.
public class Client {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);
channel.queueDeclare(Constant.SRC_RESPONSE_QUEUE,true,false,false,null);
//发送请求
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(corrId)
.replyTo(Constant.SRC_RESPONSE_QUEUE)//指定相关属性
.build();
String request = "发送请求";
//如果没有交换机的时候,RoutingKey就是队列的名称
channel.basicPublish("",Constant.SRC_REQUEST_QUEUE,properties,request.getBytes());
//接收请求
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if(corrId.equals(properties.getCorrelationId())){//判断corrId是否相等
queue.offer(new String(body));
}
}
};
channel.basicConsume(Constant.SRC_RESPONSE_QUEUE,true,consumer);
String ret = queue.take();
System.out.println("收到请求:" + ret);
channel.close();
connection.close();
}
}
- 服务端代码
服务端需要做的就是:接收消息,根据消息内容进行响应处理,把应答结果返回到回调队列中.
public class Server {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到请求:"+new String(body));
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
String message = "返回响应";
channel.basicPublish("",Constant.SRC_RESPONSE_QUEUE,basicProperties,message.getBytes());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(Constant.SRC_REQUEST_QUEUE,false,consumer);//设置收到消息之后不自动应答,在发送响应之后手动应答
}
}
RabbitMQ消息确定的机制:
在RabbitMQ中,basicConsumer方法的autoAck参数用于指定消费者是否应该自动向消息队列确认消息.
自动确认(autoAck=true):消息队列在将消息发送给消费者后,会立即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
手动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调用basicAck方法来确认消息.手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景.
- 运行程序
2.7 发布确认(Publisher Confirms/消息可靠性保证)
2.7.1 概述
作为消息中间件,都会面临丢失的问题.
消息丢失大概分为三种类型:
- 生产者问题.因为应用程序故障,网络都用等原因,生产者没有成功向Broker发送消息.
- 消息中间件自身问题,生产者成功发送给了Broker,但是Broker没有吧消息保存好,导致了信息丢失.
- 消费者问题,Broker发送消息到消费者,消费者在消费消息的时候,没有处理好,导致Broker将消费失败的消息从队列中删除了.
上面的这几个问题都有对应的解决方式,问题2可以通过持久化的机制来解决,问题3可以通过消息应答机制来解决,问题1,可以采用发布确认的机制来实现.
发布确认属于RabbitMQ的七大工作模式之一.
生产者会将Channel设置为Confirm模式,一旦信道进入Confirm模式,所有在该信道上面的消息都会被指派为一个唯一的Id.一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经确认到达目的队列了.Broker回传给生产者的确认消息中包含了确认消息的序号,此外Broker也可以设置channel.basicAck方法中的multiple参数,true表示在deliveryTag序号之前的消息都已经收到了,如果为false,那么则有消息没有收到,消息确认出了一些问题.
使用发送机制的时候,必须要将信道设置为Confirm模式.发布确认有3种策略,单独确认,批量确认,异步确认,接下来我们就来学习着三种策略.
下面是开启信道确认模式的方法.
Channel channel = connection.createChannel();
channel.confirmSelect();//开启信道确认模式
2.7.2 单独确认
首先我们需要建立连接,建立连接需要放入try语句中,所以我们可以把建立连接单独提出一个静态方法.
private static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.HOST);
connectionFactory.setPort(Constant.PORT);
connectionFactory.setUsername(Constant.USER_NAME);
connectionFactory.setPassword(Constant.PASSWORD);
connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
之后在主方法中调用三种策略,
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
publishingMessagesIndividually();
publishMessagesInBatch();
handlePublishConfirmsAsynchronously();
}
之后我们来编写单条确认模式:
/**
* 单条确认
*/
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()){//首先建立连接
Channel channel = connection.createChannel();
channel.confirmSelect();//开启信道确认模式
channel.queueDeclare(Constant.CONFIRM_QUEUE1,true,false,false,null);//创建队列
String message = "发送信息";
for (int i = 0; i < 200; i++){
channel.basicPublish("",Constant.CONFIRM_QUEUE1,null,(message+i).getBytes());
channel.waitForConfirms(5000);//等待消息确认,如果超过规定的等待时间还没有确认,则抛出异常
}
}
}
在编写代码之前,我们首先要使用try语句与服务器建立连接.
之后在创建Channel之后,需要开启信道的确认模式.channel.confirmSelect()
.
这里我们在发送消息之后,需要做的最重要的一件事就是等待消息的确认channel.waitForConfirms(5000)
,在这个方法中可以指定阻塞时间,如果在指定的时间内消息被确认,这个方法就会立即返回,如果在指定时间之内没有确认消息,则会抛出异常.
2.7.3 批量确认
/**
* 批量确认
*/
private static void publishMessagesInBatch() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()){
Channel channel = connection.createChannel();
channel.confirmSelect();//设置为确认模式
channel.queueDeclare(Constant.CONFIRM_QUEUE2,true,false,false,null);
String message = "批量确认发送消息";
int batchSize = 100;//每次批量发送的消息条数
int outstandingMessageCount = 0;//记录已经发送的条数
for (int i = 0 ; i < 200 ; i++){
channel.basicPublish("",Constant.CONFIRM_QUEUE2,null,(message+i).getBytes());
outstandingMessageCount++;//每发送一条消息,参数就进行++
if(outstandingMessageCount == batchSize){//达到了一次性批量发送的指定数量,等待确认,确认完成之后,将参数清零
channel.waitForConfirms(5000);
outstandingMessageCount = 0;
}
}
//如果发送的消息不是100的倍数,就还有消息没有确认
if (outstandingMessageCount > 0){
channel.waitForConfirms(5000);
}
}
}
这里需要注意的几点就是,在发送的消息达到一次性最大的批量数量的时候,就要确认,如果确认成功之后,需要把记录的发送数量清零.
之后,就是在出循环之后,如果发送的消息条数不是batchSize
的整数倍的时候,这时候不满足循环之内的if条件,还是有一些消息没有确认完成,就需要在循环之外再次进行确认.
2.7.4 异步确认
异步确认就是,生产者在发送消息的同时,还可以确认消息是否收到.
Channel接口中为我们提供了一个方法,addConfirmListener
.这个方法可以添加ConfirmListener
回调接口.
ConfirmListener
中包含两个方法: handleAck(long deliveryTag, boolean multiple)
和handleNack(long deliveryTag, boolean multiple)
,分别对应处理的是MQ发送给生产者的ack和nack.其中ack代表的是消息确认成功,即消息都收到的情况下,nack指的是消息在确认的时候出现了一些问题.deliveryTag
表示的是发送消息的序号,multiple
表示是否批量确认.
这里我们还需要一个有序集合来存储为确认的消息.
/**
* 异步确认
*/
private static void handlePublishConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()){
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.queueDeclare(Constant.CONFIRM_QUEUE3,true,false,false,null);
SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<>());//设置一个有序集合,用来存储未确认消息的序号
channel.addConfirmListener(new ConfirmListener() {//为信道添加监听器,监听消息的确认情况
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {//表示消息被成功确认
if (multiple){//判断消息的处理是否批量
set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了
}else {
set.remove(set.last());//如果不是批量,清除最后一个即可
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple){//判断消息的处理是否批量
set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了
}else {
set.remove(set.last());//如果不是批量,清除最后一个即可
}
//这里在消息确认不成功的时候,需要重传,这里省略
}
});
String message = "异步确认发送消息";
for (int i = 0 ;i < 200 ;i++){
long nextPublishSeqNo = channel.getNextPublishSeqNo();//获取到消息发送的序号
channel.basicPublish("",Constant.CONFIRM_QUEUE3,null,(message+i).getBytes());
set.add(nextPublishSeqNo);//把这些消息都添加到集合中
}
while (!set.isEmpty()){//等待集合中的消息都被确认完成
Thread.sleep(1000);
}
}
}
这里我们在消息确认成功之后,即handleAck
方法被调用的时候,需要把这些消息都从集合中清除掉.一种是批量的情况,直接清除掉deliveryTag
之前所有的消息,另一种是没有批量的情况,直接清除掉最后一个元素即可.当然在没有确认成功的情况下,我们需要根据具体的业务逻辑进行消息的重发.在给队列中发送消息的时候,我们需要从Channel中获取到下次发送消息开始的序号,之后我们把开始的序号放入set中,代表这些消息还没有被处理过.最后我们需要等待消息确认完成,只要存放未确认消息的set中不为空,就证明还有消息没有被确认,我们就进行阻塞等待.
上面三种方式中,假如发送的消息较多,这三种策略的执行时间:单个确认>批量确认>异步确认.