RabbitMQ学习-发布确认高级

news2025/1/9 13:50:44

发布确认springboot版本

确认机制方案:

 代码架构图:

 配置文件:

在application.properties全局配置文件中添加spring.rabbitmq.publish-confirm-type属性,这个属性有以下几种值

none:禁用发布确认模式(默认)0
correlated:发布消息成功到交换机后会触发回调方法
simple:有两种效果
第一种效果是和correlated一样会触发回调方法
第二种效果是在发布消息成功以后使用rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判单下一步的逻辑
waitForConfirmsOrDie方法如果返回false则会关闭信道,那么接下来就无法发送消息到broker

spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

添加配置类

@Configuration
public class ConfirmConfig {
 public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
 public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
 //声明业务 Exchange
 @Bean("confirmExchange")
 public DirectExchange confirmExchange(){
 return new DirectExchange(CONFIRM_EXCHANGE_NAME);
 }
 // 声明确认队列
 @Bean("confirmQueue")
 public Queue confirmQueue(){
 return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
 }
 // 声明确认队列绑定关系
 @Bean
 public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
 @Qualifier("confirmExchange") DirectExchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("key1");
 }
}

消息生产者

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
 public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @Autowired
 private MyCallBack myCallBack;
 //依赖注入 rabbitTemplate 之后再设置它的回调对象
 @PostConstruct
 public void init(){
 rabbitTemplate.setConfirmCallback(myCallBack);
 }
 @GetMapping("sendMessage/{message}")
 public void sendMessage(@PathVariable String message){
 //指定消息 id 为 1
 CorrelationData correlationData1=new CorrelationData("1");
 String routingKey="key1";
 
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correl
ationData1);
 CorrelationData correlationData2=new CorrelationData("2");
 routingKey="key2";
 
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correl
ationData2);
 log.info("发送消息内容:{}",message);
 }
}

回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
 /**
 * 交换机不管是否收到消息的一个回调方法
 * CorrelationData
 * 消息相关数据
 * ack
 * 交换机是否收到消息
 */
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 String id=correlationData!=null?correlationData.getId():"";
 if(ack){
 log.info("交换机已经收到 id 为:{}的消息",id);
 }else{
 log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
 }
 }
}

消息消费者

@Component
@Slf4j
public class ConfirmConsumer {
 public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
 @RabbitListener(queues =CONFIRM_QUEUE_NAME)
 public void receiveMsg(Message message){
 String msg=new String(message.getBody());
 log.info("接受到队列 confirm.queue 消息:{}",msg);
 }
}

结果分析

可以看到,发送了两条消息,第一条消息的 RoutingKey "key1" ,第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条 消息被直接丢弃了。

 

回退消息

mandatory参数

如果我们仅仅开启了生产者确认机制,那么当交换机接受到消息后,会直接给生产者发送确认消息,但是如果发现消息不可以路由,就会直接把消息丢弃,此时消费者接受不到消息,而且这个时候生产者也不知道消息被丢弃了,这样就导致消息丢失,我们可以通过设置mandatory参数,是的消息在传递过程中出现不可到达的目的的时候可以把消息返回给生产者

@Slf4j
@Component
public class MessageProducer implements RabbitTemplate.ConfirmCallback , 
RabbitTemplate.ReturnCallback {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 //rabbitTemplate 注入之后就设置该值
 @PostConstruct
 private void init() {
 rabbitTemplate.setConfirmCallback(this);
 /**
 * true:
 * 交换机无法将消息进行路由时,会将该消息返回给生产者
 * false:
 * 如果发现消息无法进行路由,则直接丢弃
 */
 rabbitTemplate.setMandatory(true);
 //设置回退消息交给谁处理
 rabbitTemplate.setReturnCallback(this);
 }
 @GetMapping("sendMessage")
public void sendMessage(String message){
 //让消息绑定一个 id 值
 CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
 
rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1)
;
 log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");
 CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
 
rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData2)
;
 log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");
}
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 String id = correlationData != null ? correlationData.getId() : "";
 if (ack) {
 log.info("交换机收到消息确认成功, id:{}", id);
 } else {
 log.error("消息 id:{}未成功投递到交换机,原因是:{}", id, cause);
 }
 }
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String 
exchange, String routingKey) {
 log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",
 new String(message.getBody()),replyText, exchange, routingKey);
 }
}

回调接口:

@Component
@Slf4j
public class MyCallBack implements 
RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
 /**
 * 交换机不管是否收到消息的一个回调方法
 * CorrelationData
 * 消息相关数据
 * ack
 * 交换机是否收到消息
 */
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 String id=correlationData!=null?correlationData.getId():"";
 if(ack){
 log.info("交换机已经收到 id 为:{}的消息",id);
 }else{
 log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
 }
 }
 //当消息无法路由的时候的回调方法
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String 
exchange, String routingKey) {
 log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",new 
String(message.getBody()),exchange,replyText,routingKey);
 }
}

结果分析:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/579903.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis的SDS+IntSet+Dict

一)SDS 在redis中,保存key的是字符串,value往往是字符串或者是字符串的集合,可见字符串是redis中最常用的一种数据结构: 但是在redis中并没有直接使用C语言的字符串,因为C语言的字符串存在很多问题 1)获取字符串的长度需要通过运算…

算法12.从暴力递归到动态规划5

算法|12.从暴力递归到动态规划5 1.机器人行进问题 题意:假设有排成一行的N个位置记为1~N,N一定大于或等于2 开始时机器人在其中的M位置上(M一定是1~N中的一个) 如果机器人来到1位置,那么下一步只能往右来到2位置; 如果机器人来到…

stc15w404as使用keil做库,提供头文件,供调用

背景 有个项目使用需要使用库,将代码封装起来,仅仅留下调试接口,给用户使用,调试一些参数。这样工程看起来更简单,也方便客户维护。 也有一些使用场景,需要把自己的代码封装起来,这个是怕被别…

电脑msvcp120.dll缺失怎么办?由于找不到msvcp120.dll的解决方案

MSVCP120.dll文件是Windows操作系统中的一种动态链接库文件。它是由Microsoft C软件包提供的重要组件。当系统提示“MSVCP120.dll文件缺失”时,可能会导致某些应用程序无法正常运行。 以下是修复MSVCP120.dll缺失问题的几种方法: 方法一:修复…

如何在华为OD机试中获得满分?Java实现【公共子串计算】一文详解!

✅创作者:陈书予 🎉个人主页:陈书予的个人主页 🍁陈书予的个人社区,欢迎你的加入: 陈书予的社区 🌟专栏地址: Java华为OD机试真题(2022&2023) 文章目录 1、题目描述2、输入描述3、输出描述…

<学习笔记>从零开始自学Python-之-web应用框架Django( 十四)上线部署(阿里云+Nginx+uwsgi+MySQL)

好了,我们现在有了一个完整的网站,在自己电脑上跑起来没问题了,但是我们做网站肯定不只是为了在本机上自己欣赏,总要放到网上去让别人来浏览。这一章我们就完整跑一遍Django项目的生产环境部署。 1、基本原理 想让你的网站在公网…

【P38】JMeter 随机控制器(Random Controller)

文章目录 一、随机控制器(Random Controller)参数说明二、测试计划设计2.1、测试计划一2.2、测试计划二2.3、勾选忽略子控制器块 一、随机控制器(Random Controller)参数说明 可以让控制器内部的逻辑随机执行一个,一般…

如何在华为OD机试中获得满分?Java实现【24点游戏算法】一文详解!

✅创作者:陈书予 🎉个人主页:陈书予的个人主页 🍁陈书予的个人社区,欢迎你的加入: 陈书予的社区 🌟专栏地址: Java华为OD机试真题(2022&2023) 文章目录 1、题目描述2、输入描述3、输出描述…

python自动演奏Freepiano【双手合奏】

文章编写背景 玩了N久的Freepiano,碍于本人没有天赋,左右手一直没法协调。 于是,突然奇想,也是代码设计的思路: 用多线程的方式,开两个线程,然后通过按键模拟的方式,分别模拟左右手去演奏。觉…

分布式网络通信框架(一)——集群和分布式

单机聊天服务器 缺点: 受限于硬件资源,服务器所能承受的用户并发量不够大; 任意模块修改,都会导致整个项目代码重新编译、部署; 系统中,有些模块是CPU密集型,有些是IO密集型,造成…

计算机网络五 传输层

传输层 概念 传输层是指ISO/OSI模型中的第四层,在计算机网络中起着非常重要的作用。它负责数据在网络中的传输,管理数据传输的可靠性和流量控制,保证数据在网络中不会丢失或重复。 提供的服务 传输层提供的主要服务有两种,分别…

《数据库应用系统实践》------ 包裹信息管理系统

系列文章 《数据库应用系统实践》------ 包裹信息管理系统 文章目录 系列文章一、需求分析1、系统背景2、 系统功能结构(需包含功能结构框图和模块说明)3.系统功能简介 二、概念模型设计1.基本要素(符号介绍说明&…

9. Linux下实现简单的UDP请求

本文简单介绍了UDP传输层协议,并在Linux下实现简单的socket通讯 一、UDP UDP(User Datagram Protocol,用户数据报协议)是一种无连接的传输层协议,它不保证数据包的可靠性和顺序。UDP在IP协议的基础上增加了简单的差错…

阿里云服务器配置CPU内存、带宽和系统盘选择方法

阿里云服务器配置怎么选择?CPU内存、公网带宽和系统盘怎么选择?个人用户选择轻量应用服务器或ECS通用算力型u1云服务器,企业用户选择ECS计算型c7、通用型g7云服务器,阿里云服务器网分享阿里云服务器配置选择方法: 目录…

大数据周会-本周学习内容总结015

开会时间:2023.05.28 15:30 线下会议 目录 01【fhzny项目】 02【Spark】 03【调研-数仓构建】 3.1【数仓构建,流程图、架构图、使用场景】 场景选择 组件设计 构建流程 04【专利】 05【导师点评】 01【fhzny项目】 GitLabMyBatis-PlusSpringbo…

Java001——基本的Dos命令

打开CMD的方式 1、win10:开始->系统->命令提示符 win11:开始->windows工具->命令提示符 2、Win键R输入cmd 打开控制台 3、进入文件夹,按住shift键鼠标右键点击,在此…

路径规划算法:基于萤火虫优化的路径规划算法- 附代码

路径规划算法:基于萤火虫优化的路径规划算法- 附代码 文章目录 路径规划算法:基于萤火虫优化的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要:本文主要介绍利用智能优化算法…

[第一章 web入门]SQL注入-1

拿到题目是一篇日记,是GET型请求方式,我们可以直接在url栏中注入数据 判断注入类型,页面有回显所以不是整型注入 id 1 and 1 2 id 1 页面无回显,判断为字符型注入,闭合符应该就是单引号 id 1 order by 4-- 无回显&…

C#,码海拾贝(26)——求解“一般带状线性方程组”之C#源代码,《C#数值计算算法编程》源代码升级改进版

using System; namespace Zhou.CSharp.Algorithm { /// <summary> /// 求解线性方程组的类 LEquations /// 原作 周长发 /// 改编 深度混淆 /// </summary> public static partial class LEquations { /// <summary> /…

Reorder buffer and Reservation station

Reoder buffer(ROB) 为了解决异常问题 instruction 的decode阶段被写入缓冲区的条目&#xff0c;指令完成的话&#xff0c;向缓冲区写入结果&#xff0c;最早decode的指令&#xff08;程序顺序&#xff09;如果没有被标记异常的话&#xff0c;写入reg file register rename …