参考资料
- RabbitMQ官方网站
- RabbitMQ官方文档
- 噼咔噼咔-动力节点教程
文章目录
- 八、RabbitMQ的确认机制 -confirm
- 8.1 Confirm 模式简介
- 8.2 具体代码设置
- 8.2.1 **设置思路**:
- 8.2.2 **代码实现**
- 8.2.2.1 开启生产者的确认模式.
- 8.2.2.2 实现接口ComfirmCallback
- 8.2.2.3 配置rabbitTemplate并发送消息
- 8.2.2.3 测试运行
- 8.3 使用匿名内部类优化代码写法
- 8.4 :star: 推荐-使用lambda表达式实现确认模式
- 九、RabbitMQ消息Return模式
- 9.1 返回模式概念
- 9.2 具体代码设置
- 9.2.1 开启确认模式
- 9.2.2 实现ReturnsCallback接口
- 9.2.3 设置回调函数
- 9.2.3 :star:代码用例(lambda
- 9.2.4 测试结果
- 9.3 :star:使用Lambda函数实现返回模式
- 十、RabbitMQ交换机的属性
- 10.1 具体属性
- 10.2 交换机持久化
- 10.3 自动删除
- 10.4 内部交换机和备用交换机
八、RabbitMQ的确认机制 -confirm
8.1 Confirm 模式简介
消息队列的 confirm 确认机制,是指生产者投递消息后,到达了消息服务器 Broker 里面的exchange 交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到 Broker 的 exchange 中。
这也是消息可靠性投递的重要保障。
用于确保消息传递是否正常,但是会牺牲一些性能,但是提高了系统运行的稳定性
8.2 具体代码设置
8.2.1 设置思路:
8.2.2 代码实现
基于之前的案例中的直连路由器进行测试。
需要传递如下的参数才能成功发送消息。
8.2.2.1 开启生产者的确认模式.
在配置文件中,开启生产者的确认模式:
- 与CorrelationData一起使用可将确认与已发送的消息关联起来。
# Use with CorrelationData to correlate confirmations with sent messsages.
publisher-comfirm-type: correlated
8.2.2.2 实现接口ComfirmCallback
@Component
@Slf4j
public class DemoRabbitCallback implements RabbitTemplate.ConfirmCallback {
/**
* 证实
*
* @param correlationData 相关数据
* @param ack 是否应答成功
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("(确认模式) 消息正常发送:{}" , correlationData);
return;
}
log.error("(确认模式) 消息发送异常 :{},异常原因:{}" , correlationData , cause);
}
}
8.2.2.3 配置rabbitTemplate并发送消息
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ConfirmTestController {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private DemoRabbitCallback demoRabbitCallback;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(demoRabbitCallback);
log.info("(确认模式)成功配置回调方法");
}
@GetMapping("/{key}")
public void sentErrorMsg(@PathVariable("key") String key) {
String msg = "...确认模式测试消息...";
// 声明一个相关性数据,可以用于辅助确认本次请求的信息
CorrelationData correlationData = new CorrelationData();
String uuid = "ORDER_NUM_"+UUID.randomUUID();
correlationData.setId(uuid);
log.info("(确认模式)准备发送的信息:{} , 交换机名称是 :{} , 相关数据信息:{} ", msg, key , correlationData);
// 发送消息时添加参数:CorrelationData
rabbitTemplate.convertAndSend(key,
"error",
msg.getBytes(StandardCharsets.UTF_8),
correlationData);
}
}
8.2.2.3 测试运行
尝试发送一个交换机名称错误的信息。异常信息如下:
(确认模式) 消息发送异常 :CorrelationData [id=ORDER_NUM_084864b0-ee07-4e60-b19c-d360edeecb27],异常原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xcong.directaaaaa' in vhost 'hc-test', class-id=60, method-id=40)
发送一个正确的名称时/confirm/xcong.direct
:也会打印日志:
(确认模式) 消息正常发送:CorrelationData [id=ORDER_NUM_744c6441-4075-4c4b-ba3f-530578b901aa]
进入控制台也可以看该消息,而且可以发现,相关信息是存放在headers中
8.3 使用匿名内部类优化代码写法
教程里提到的写法,将消息发送的服务和回调方法的实现合在一个类中完成。类似下面这样
省一个类而已,局限性比较大,不适合全局部署。根据实际需求场景使用。
另外一种方法即使用匿名内部类。效果如下,跟上面的方法类似,不是很好用。
8.4 ⭐️ 推荐-使用lambda表达式实现确认模式
- 代码简洁
- 阅读直观
当接口只有一个方法,且添加了注解
@FunctionalInterface
时,就可以尝试用lambda表示简洁写法
最终效果改造如下(测试和之前的示例一样
@RestController
@RequestMapping("/v2/confirm")
@Slf4j
public class ConfirmTestController2 {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(
// 参数类型可以省略 CorrelationData correlationData, boolean ack, String cause
(correlationData, ack, cause) -> {
if (ack) {
log.info("(确认模式) 消息正常发送:{}", correlationData);
return;
}
log.error("(确认模式) 消息发送异常 :{},异常原因:{}", correlationData, cause);
}
);
log.info("(确认模式)成功配置回调方法");
}
九、RabbitMQ消息Return模式
9.1 返回模式概念
刚才我们已经确保了“生产者——>交换机”的消息可靠性,接下来的返回模式就是确保“exchange——>queue”的消息可靠性
- P -> X ——确认模式
- X -> Q ——返回模式
9.2 具体代码设置
思路和确认模式类型,不贴代码了,直接截图作为参考即可
9.2.1 开启确认模式
在配置文件中开启确认模式
publisher-returns: true
9.2.2 实现ReturnsCallback接口
9.2.3 设置回调函数
9.2.3 ⭐️代码用例(lambda
@Slf4j
@RestController
@RequestMapping("/return")
public class ReturnTestController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(
(returnedMessage) -> {
log.info("(返回模式) 异常信息内容是:{}", returnedMessage);
}
);
}
@GetMapping("/{key}/{msg}")
public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("key") String key) {
log.info("(返回模式) 准备发送的信息:{} , 路由键 :{}", msg, key);
rabbitTemplate.convertAndSend(exchangeName, key, msg.getBytes(StandardCharsets.UTF_8));
}
}
9.2.4 测试结果
-
发送一个错误的路由key
/return/error1/返回模式测试消息
-
发送一个正确的路由key时:不会触发回调函数
9.3 ⭐️使用Lambda函数实现返回模式
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(
returnedMessage-> {
log.info("(返回模式) 异常信息内容是:{}", returnedMessage);
}
);
}
十、RabbitMQ交换机的属性
10.1 具体属性
- Name:名称
- Type:类型,direct/fanout/topic/headers
- Durability:持久化。
- auto Delete:自动删除。
- intenal:内部交换机
- Arguments:自定义参数
- Alternate exchange:备用交换机
10.2 交换机持久化
- 声明交换机是否持久化,服务器重启后是否还在
- 默认是持久化
- 如果是非持久化,服务器重启后,交换机的数据就会丢失
10.3 自动删除
- 如果true,当所有的队列和交换接触和交换机的绑定后,交换器将删除自身
- 删除的结点:最后一个队列或者交换机接触绑定时。
- 默认为false
10.4 内部交换机和备用交换机
- intenal属性值为yes(true)时,该交换机无法被客户端直接访问,只能作为和其他交换机的相互绑定中。
- 在参数配置中,添加
alternate-exchange
的值,可以声明一个备用交换机:- 当一个消息无法被routed时,可以将他们的信息转发到备用交换机上。
实际的应用场景:
可以用于存储客户端发送错误routingKey的信息,防止消息发送时指定了错误的key,导致消息丢失的情况。
也是一种保证消息可靠性投递的方式