文章目录
- 消息确认机制
- 事务机制
- Confirm模式
消息确认机制
生产者将消息发送出去之后,AMQP 协议中实现了 事务机制 和 Confirm 模式 两种方式确认消息有没有到达 RabbitMQ 服务器
事务机制
channel.txSelect()
声明启动事务模式;
channel.txCommit()
提交事务;
channel.txRollback()
回滚事务;
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "hello rabbitmq";
channel.txSelect();//开启事务
try {
channel.basicPublish("",QUEUE_NAME,null,str.getBytes());
//加入错误代码后事务回滚
int i = 1/0;
channel.txCommit();//提交事务
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();//回滚事务
} finally {
channel.close();
conn.close();
}
模式缺点:事务机制通常涉及到对消息队列服务器的锁定和解锁操作,这些操作会增加额外的延迟,从而降低系统的吞吐量。在事务开启期间,通道(channel)会被阻塞,直到事务被提交或回滚,这期间无法进行其他操作。
事务会锁定涉及的资源,直到事务完成。如果事务处理时间较长,或者有大量的事务同时进行,可能会导致资源竞争和锁定问题,影响系统的并发能力。
Confirm模式
方式一:channel.waitForConfirms()
普通发送方确认模式;
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "hello rabbitmq";
channel.confirmSelect();//开启消息确认模式
channel.basicPublish("",QUEUE_NAME,null,str.getBytes());
//加入错误代码后事务回滚
int i = 1/0;
if (channel.waitForConfirms()) {
System.out.println("消息确认发送");
}
channel.close();
conn.close();
channel.confirmSelect() 声明开启发送方确认模式,再使用 channel.waitForConfirms() 等待消息被服务器确认即可。
方式二:channel.waitForConfirmsOrDie()
批量确认模式;
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = "hello rabbitmq";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会
IOException
System.out.println("全部执行完成");
channel.confirmSelect() 声明开启发送方确认模式,再使用 channel.waitForConfirmsOrDie() 等待消息被服务器确认即可。
方式三:channel.addConfirmListener()
异步监听发送方确认模式;
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();//开启消息确认
//监听消息确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b",deliveryTag, multiple));
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + multiple);
}
});
for (int i = 0; i < 30; i++) {
//发送消息
String msg = "hello rabbitmq";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
Thread.sleep(10000);
System.out.println("发送成功!!!");
//释放资源
channel.close();
conn.close();
- Multiple:是否多条
- deliveryTag:如果是多条,这个就是最后一条消息的 tag
异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。