RabbitMQ 消息持久化
持久化是为提高rabbitmq消息的可靠性,防止在异常情况(重启,关闭,宕机)下数据的丢失。设置完队列和消息的持久化,并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接受消息但尚未保存消息时,仍有一个较短的时间窗口。另外, RabbitMQ 不会对每条消息都执行 fsync(2) – 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认(RabbitMQ 消息确认机制)。
首先定义好获取RabbitMQ连接的工具类
/**
* 类描述:
* 连接工具类
* @author cfl
* @version 1.0
* @date 2022/10/10 13:49
*/
public class ConnectionUtil {
/**
* 获取MQ的连接
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost("localhost");
// AMQP 5672
factory.setPort(5672);
// vhost
factory.setVirtualHost("/vhost01");
// 用户名
factory.setUsername("admin");
// 密码
factory.setPassword("123456");
return factory.newConnection();
}
}
重启RabbitMQ服务
下面介绍几种重启RabbitMQ的方法:
- 使用 rabbitmq-service(前提是使用
rabbitmq-service install
安装RabbitMQ服务)
D:\application\rabbitmq\rabbitmq_server-3.10.8\sbin>rabbitmq-service stop
RabbitMQ 服务正在停止...
RabbitMQ 服务已成功停止。
D:\application\rabbitmq\rabbitmq_server-3.10.8\sbin>rabbitmq-service start
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。
- net命令
D:\application\rabbitmq\rabbitmq_server-3.10.8\sbin>sc query RabbitMQ
SERVICE_NAME: RabbitMQ
TYPE : 10 WIN32_OWN_PROCESS
STATE : 4 RUNNING
(STOPPABLE, NOT_PAUSABLE, ACCEPTS_SHUTDOWN)
WIN32_EXIT_CODE : 0 (0x0)
SERVICE_EXIT_CODE : 0 (0x0)
CHECKPOINT : 0x0
WAIT_HINT : 0x0
D:\application\rabbitmq\rabbitmq_server-3.10.8\sbin>net stop RabbitMQ
RabbitMQ 服务正在停止...
RabbitMQ 服务已成功停止。
D:\application\rabbitmq\rabbitmq_server-3.10.8\sbin>net start RabbitMQ
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。
- 使用 rabbitmq-server(前提RabbitMQ是以
rabbitmq-server -detached
启动)
启动服务:rabbitmq-server -detached
重启服务:rabbitmq-server restart
关闭服务:rabbitmqctl stop
查看状态:rabbitmqctl status
交换机开启持久化
RabbitMQ只有队列才有存储消息的能力,也就是说只要队列和消息开启了持久化,那么交换机开不开都不影响,但是为了统一规则,也可以将交换机申明成持久化:
// true申明为持久化的交换机
channel.exchangeDeclare("test_durable_exchange", BuiltinExchangeType.DIRECT, true);
如果交换机不申明持久化,那么当RabbitMQ重启的时候,交换机会丢失(不会影响队列和消息)。
队列关闭持久化
总结:如果生产者在定义队列时将其设置关闭持久化,那么在RabbitMQ重启服务时,队列和消息都会丢失。
示例:
/**
* 类描述:
* 队列不持久化
* @author cfl
* @version 1.0
* @date 2022/10/13 21:55
*/
public class Send1 {
//~fields
//==================================================================================================================
public static final String QUEUE_NAME = "test_durable_queue";
//~methods
//==================================================================================================================
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 申明队列,关闭持久化(durable=false)
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
for (int i = 0; i < 50; i++) {
String message = "hello durable " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("send durable end");
channel.close();
connection.close();
}
}
运行上面代码,运行完成后查看RabbitMQ的队列信息:
可以看到,队列被创建了,并且有50条消息
重启RabbitMQ服务后,可以看到刚刚创建的队列,消息都不存在了。
队列开启持久化
总结:如果生产者在定义队列时将其设置开启持久化,那么在RabbitMQ重启服务时,队列依然存在,但是消息全都丢失了。
示例:
/**
* 类描述:
* 队列持久化
* @author cfl
* @version 1.0
* @date 2022/10/13 21:55
*/
public class Send2 {
//~fields
//==================================================================================================================
public static final String QUEUE_NAME = "test_durable_1_queue";
//~methods
//==================================================================================================================
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 申明队列,开启持久化(durable=true)
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
for (int i = 0; i < 50; i++) {
String message = "hello durable " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("send durable end");
channel.close();
connection.close();
}
}
运行上面代码,运行完成后查看RabbitMQ的队列信息:
重启RabbitMQ服务后,可以看到队列依然存在,但是队列中的消息已经丢失了
如果RabbitMQ已经对队列进行申明,如果我们再次申明队列时,修改了申明队列的参数,那么会出现错误。
队列开启持久化 - 消息开启持久化
总结:队列和消息都开启了持久化,RabbitMQ服务重启后,队列和消息都会恢复。
队列持久化参考上面的代码,首先我们先查看发送消息的源码的常用API(一共有三个):
/**
* Publish a message. 发布消息
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception 发布到不存在的交换器会导致通道级协议异常,从而关闭通道, which closes the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a href="https://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. 如果资源驱动的警报 生效, Channel#basicPublish的调用最终将被阻塞。
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
* @param exchange the exchange to publish the message to 将消息发布到的交换机
* @param routingKey the routing key 路由键
* @param props other properties for the message - routing headers etc 消息的其他属性 - 路由标头等
* @param body the message body 消息正文
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
创建 BasicProperties对象props:
方式一:
自己创建BasicProperties对象
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
// deliveryMode=1表明不持久化消息;deliveryMode=2表明持久化消息
.builder().deliveryMode(2)
.build();
方式二:
使用com.rabbitmq.client.MessageProperties
类中定义的6个静态成员变量:
/**
* Constant holder class with useful static instances of {@link AMQContentHeader}.
* These are intended for use with {@link Channel#basicPublish} and other Channel methods.
*/
public class MessageProperties {
/** Empty basic properties, with no fields set 空的基本属性,没有设置字段*/
public static final BasicProperties MINIMAL_BASIC =
new BasicProperties(null, null, null, null,
null, null, null, null,
null, null, null, null,
null, null);
/** Empty basic properties, with only deliveryMode set to 2 (persistent) 空的基本属性,只有 deliveryMode 设置为 2(持久)*/
public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
new BasicProperties(null, null, null, 2,
null, null, null, null,
null, null, null, null,
null, null);
/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero 内容类型“application/octet-stream”,deliveryMode 1(非持久),优先级为零*/
public static final BasicProperties BASIC =
new BasicProperties("application/octet-stream",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);
/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero 内容类型“application/octet-stream”,deliveryMode 2(持久),优先级为零*/
public static final BasicProperties PERSISTENT_BASIC =
new BasicProperties("application/octet-stream",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero 内容类型“text/plain”,deliveryMode 1(非持久),优先级为零*/
public static final BasicProperties TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero 内容类型“text/plain”,deliveryMode 2(持久),优先级为零*/
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
}
生产者代码:
/**
* 类描述:
* 队列持久化,消息持久化
* @author cfl
* @version 1.0
* @date 2022/10/13 21:55
*/
public class Send3 {
//~fields
//==================================================================================================================
public static final String QUEUE_NAME = "test_durable_3_queue";
//~methods
//==================================================================================================================
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 申明队列,开启持久化(durable=true)
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 自定义消息属性,设置持久化消息(deliveryMode=2)
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
// deliveryMode=1表明不持久化消息;deliveryMode=2表明持久化消息
.builder().deliveryMode(2)
.build();
for (int i = 0; i < 50; i++) {
String message = "hello durable " + i;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
System.out.println("send durable end");
channel.close();
connection.close();
}
}
运行上面的程序,运行完成后查看RabbitMQ的队列信息:
重启RabbitMQ服务后,可以看到队列依然存在,队列中的消息也恢复了: