解决消息不丢失的一个重要环节。
前面说过消息持久化,可能出现一种情况就是:
尽管它告诉rabbitmq将消息保存到磁盘,但是依然存在当消息刚准备存储到磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时消息并没有真正的写入磁盘。持久性保证并不强。
发布确认:
就是当消息真正保存到磁盘上的时候,mq要跟生产者说一声真的将消息保存到磁盘上了。
第一步:开启发布确认的方法
发布确认默认是关闭的。
生产者的信道上开启
//开启发布确认 channel.confirmSelect();
发布的类型:
单个确认发布
同步确认发布方式,缺点:发布速度特别慢。
/**
* 单个确认,同步
* @throws IOException
*/
public static void confirmSingle() throws IOException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//单个消息发送后,马上进行确认
boolean flag = channel.waitForConfirms();
if(flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条消息,单个消息确认消耗时间为:"+(end - start)+"ms");
}
// 发布1000条消息,单个消息确认消耗时间为:851ms
批量确认发布
比单个确认的速度快,但是缺点是当发生故障导致发布出现问题时,不知道是那个消息出现了问题,这种方案也是同步的,一样阻塞消息的发布。
public static void batchConfirm() throws Exception {
Channel channel = RabbitMQUtil.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
long start = System.currentTimeMillis();
int batch = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//批量处理消息,每一百条消息进行一次确认
if(i % batch == 0) {
boolean flag = channel.waitForConfirms();
if(flag) {
System.out.println("消息发送成功");
}
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条消息,批量消息确认(每100条确认一次)消耗时间为:"+(end - start)+"ms");
}
发布1000条消息,批量消息确认(每100条确认一次)消耗时间为:132ms
异步确认发布
逻辑上比前两个要复杂,但是性能比较高。无论是可靠性和效率都没得说。利用回调函数来达到消息的可靠性传递的。
broker会异步通知。
/**
* 异步确认发布
* @throws Exception
*/
public static void asyncConfirm() throws Exception {
Channel channel = RabbitMQUtil.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//准备一个安全有序的一个哈希表,适用于高并发情况下
//1.轻松将序号和消息进行关联
//2.轻松批量删除条目,只要给到序号
//3.支持高并发多线程
ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
long start = System.currentTimeMillis();
//消息确认成功回调函数
//第一个参数:消息的标记
//第二个参数:是否为批量
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
//删除掉已经确认的消息,剩下的就是未确认的消息
if(multiple) { //批量确认
ConcurrentNavigableMap<Long, String> concurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag);
concurrentNavigableMap.clear();//全部删除
}else {//单个
concurrentSkipListMap.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
};
//消息确认失败回调函数
ConfirmCallback nackCallBack = (deliveryTag, multiple) -> {
//打印一下未确认消息有哪些
String message = concurrentSkipListMap.get(deliveryTag);
System.out.println("未确认的消息:" + message + "消息tag=" + deliveryTag);
};
//准备消息的监听器,监听哪些消息成功了,哪些消息失败了
channel.addConfirmListener(ackCallback, nackCallBack);
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//记录下所有要发送的消息,消息的总和
concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条消息,批量异步消息确认消耗时间为:"+(end - start)+"ms");
}
发布1000条消息,批量异步消息确认消耗时间为:39ms
如何处理异步未确认消息
最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列,在confirm callbacks与发布线程之间进行消息的传递。
两个线程:一个线程负责监听,另一个线程负责发送消息。