这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
目录
一、消息队列
1.1、发布确认模式
1.2、案例代码
(1)引入依赖
(2)编写生产者【消息确认--单条确认】
(3)编写生产者【消息确认--批量确认】
(4)编写生产者【消息确认--异步确认】
一、消息队列
1.1、发布确认模式
RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???
RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。
发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。
1.2、案例代码
(1)引入依赖
<!-- 引入 RabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
(2)编写生产者【消息确认--单条确认】
- 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
- 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
- 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:23
* @Copyright (C) ZhuYouBin
* @Description: 消息生产者
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接的 RabbitMQ 服务地址
factory.setHost("127.0.0.1"); // 默认就是本机
factory.setPort(5672); // 默认就是 5672 端口
// 3、获取连接
Connection connection = null; // 连接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、获取通道
channel = connection.createChannel();
// TODO 开启消息确认机制
channel.confirmSelect();
// 5、声明 Exchange,如果不存在,则会创建
String exchangeName = "exchange_direct_2023";
channel.exchangeDeclare(exchangeName, "direct");
// 6、发送消息
for (int i = 0; i < 10; i++) {
// 路由键唯一标识
String routingKey = "error";
if (i % 3 == 0) {
routingKey = "info";
} else if (i % 3 == 1) {
routingKey = "warn";
}
String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
// 等待RabbitMQ返回ACK标识
boolean wait = channel.waitForConfirms();
System.out.println("RabbitMQ是否接收成功: " + wait);
if (!wait) {
// 消息发送失败,则可以重新发送
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {}
}
if (null != connection) {
try {
connection.close();
} catch (Exception e) {}
}
}
}
}
(3)编写生产者【消息确认--批量确认】
- 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
- 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
- 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
- 所以,可以通过捕获IOException异常来判断消息是否发送成功。
- 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:23
* @Copyright (C) ZhuYouBin
* @Description: 消息生产者
*/
public class ProducerBatch {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接的 RabbitMQ 服务地址
factory.setHost("127.0.0.1"); // 默认就是本机
factory.setPort(5672); // 默认就是 5672 端口
// 3、获取连接
Connection connection = null; // 连接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、获取通道
channel = connection.createChannel();
// TODO 开启消息确认机制
channel.confirmSelect();
// 5、声明 Exchange,如果不存在,则会创建
String exchangeName = "exchange_direct_2023";
channel.exchangeDeclare(exchangeName, "direct");
// 6、发送消息
int batchSize = 3;
int count = 0;
for (int i = 0; i < 10; i++) {
// 路由键唯一标识
String routingKey = "error";
if (i % 3 == 0) {
routingKey = "info";
} else if (i % 3 == 1) {
routingKey = "warn";
}
String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
// 批量确认
if (count == batchSize) {
// 等待RabbitMQ返回ACK标识
channel.waitForConfirmsOrDie();
count = 0;
}
count++;
}
} catch (IOException e) {
System.out.println("消息发送失败啦");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {}
}
if (null != connection) {
try {
connection.close();
} catch (Exception e) {}
}
}
}
}
(4)编写生产者【消息确认--异步确认】
- 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
- 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:23
* @Copyright (C) ZhuYouBin
* @Description: 消息生产者
*/
public class ProducerAsync {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接的 RabbitMQ 服务地址
factory.setHost("127.0.0.1"); // 默认就是本机
factory.setPort(5672); // 默认就是 5672 端口
// 3、获取连接
Connection connection = null; // 连接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、获取通道
channel = connection.createChannel();
// TODO 开启消息确认机制
channel.confirmSelect();
// 5、声明 Exchange,如果不存在,则会创建
String exchangeName = "exchange_confirm_2023";
channel.exchangeDeclare(exchangeName, "direct");
// TODO 一定要先调用监听接口,在发送消息
channel.addConfirmListener(new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag
+ ",批量发送多条消息multiple=" + multiple);
}
}, new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
System.out.println("RabbitMQ接收失败啦.....");
}
});
for (int i = 0; i < 10; i++) {
// 6、发送消息
String message = "这是发布确认模式,发送的消息数据";
channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());
}
} catch (IOException e) {
System.out.println("消息发送失败啦");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {}
}
if (null != connection) {
try {
connection.close();
} catch (Exception e) {}
}
}
}
}
到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。
综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。